liferoad commented on code in PR #33006:
URL: https://github.com/apache/beam/pull/33006#discussion_r1847462508


##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java:
##########
@@ -257,6 +260,92 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
   /** Dataflow service endpoints are expected to match this pattern. */
   static final String ENDPOINT_REGEXP = "https://[\\S]*googleapis\\.com[/]?";;
 
+  /**
+   * Replaces GCS file paths with local file paths by downloading the GCS 
files locally. This is
+   * useful when files need to be accessed locally before being staged to 
Dataflow.
+   *
+   * @param filesToStage List of file paths that may contain GCS paths (gs://) 
and local paths
+   * @return List of local file paths where any GCS paths have been downloaded 
locally
+   * @throws RuntimeException if there are errors copying GCS files locally
+   */
+  public static List<String> replaceGcsFilesWithLocalFiles(List<String> 
filesToStage) {
+    List<String> processedFiles = new ArrayList<>();
+
+    for (String fileToStage : filesToStage) {
+      String localPath;
+      if (fileToStage.contains("=")) {
+        // Handle files with staging name specified
+        String[] components = fileToStage.split("=", 2);
+        String stagingName = components[0];
+        String filePath = components[1];
+
+        if (filePath.startsWith("gs://")) {
+          try {
+            // Create temp file with exact same name as GCS file
+            String gcsFileName = filePath.substring(filePath.lastIndexOf('/') 
+ 1);
+            File tempDir = Files.createTempDir();
+            tempDir.deleteOnExit();
+            File tempFile = new File(tempDir, gcsFileName);
+            tempFile.deleteOnExit();
+
+            LOG.info(
+                "Downloading GCS file {} to local temp file {}",
+                filePath,
+                tempFile.getAbsolutePath());
+
+            // Copy GCS file to local temp file
+            ResourceId source = FileSystems.matchNewResource(filePath, false);
+            try (ReadableByteChannel reader = FileSystems.open(source);
+                FileOutputStream writer = new FileOutputStream(tempFile)) {
+              ByteStreams.copy(Channels.newInputStream(reader), writer);

Review Comment:
   ```
   Expect srcResourceIds and destResourceIds have the same scheme, but received 
gs, file.
   java.lang.IllegalArgumentException: Expect srcResourceIds and 
destResourceIds have the same scheme, but received gs, file.
   ```
   Looks like our copy needs the same scheme. :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to