ibzib commented on a change in pull request #14520:
URL: https://github.com/apache/beam/pull/14520#discussion_r612895287



##########
File path: 
runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/resources/PipelineResourcesTest.java
##########
@@ -102,8 +100,57 @@ public void testPackagingDirectoryResourceToJarFile() 
throws IOException {
 
   @Test
   public void testIfThrowsWhenThereIsNoTemporaryFolderForJars() throws 
IOException {
-    List<String> filesToStage = new ArrayList<>();
-    filesToStage.add(tmpFolder.newFolder().getAbsolutePath());
+    List<String> filesToStage = 
Arrays.asList(tmpFolder.newFolder().getAbsolutePath());
+
+    IllegalArgumentException exception =
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> PipelineResources.prepareFilesForStaging(filesToStage, 
null));

Review comment:
       I guess we don't have nullness checks set up for this package, otherwise 
we could statically guarantee this can't be null. 

##########
File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/resources/PipelineResources.java
##########
@@ -64,6 +70,31 @@
     return resourcePath -> !resourcePath.contains("gradle/wrapper");
   }
 
+  /**
+   * Goes through the list of files that need to be staged on runner. Removes 
nonexistent
+   * directories and packages existing ones. This is necessary for runners 
that require filesToStage
+   * to be jars only.
+   *
+   * @param options options object with the files to stage and temp location 
for staging
+   */
+  public static void prepareFilesForStaging(FileStagingOptions options) {
+    List<String> filesToStage = options.getFilesToStage();
+    if (filesToStage == null || filesToStage.isEmpty()) {
+      filesToStage = 
detectClassPathResourcesToStage(ReflectHelpers.findClassLoader(), options);

Review comment:
       I did have a problem the context classloader (which is the first choice 
for `ReflectHelpers.findClassLoader()`) in the Spark runner before - 
BEAM-10109. But the class loader should only be mutated like that when pipeline 
construction and Spark execution are happening in the same JVM, so as long as 
we don't rely on  `prepareFilesForStaging` for tests I don't anticipate it 
being a problem.

##########
File path: 
runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/resources/PipelineResourcesTest.java
##########
@@ -102,8 +100,57 @@ public void testPackagingDirectoryResourceToJarFile() 
throws IOException {
 
   @Test
   public void testIfThrowsWhenThereIsNoTemporaryFolderForJars() throws 
IOException {
-    List<String> filesToStage = new ArrayList<>();
-    filesToStage.add(tmpFolder.newFolder().getAbsolutePath());
+    List<String> filesToStage = 
Arrays.asList(tmpFolder.newFolder().getAbsolutePath());
+
+    IllegalArgumentException exception =
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> PipelineResources.prepareFilesForStaging(filesToStage, 
null));
+
+    assertEquals(
+        "Please provide temporary location for storing the jar files.", 
exception.getMessage());
+  }
+
+  @Test
+  public void testPrepareFilesForStagingFromOptions() throws IOException {
+    String nonexistentFilePath = tmpFolder.getRoot().getPath() + 
"/nonexistent/file";
+    String existingFilePath = 
tmpFolder.newFile("existingFile").getAbsolutePath();
+    List<String> filesToStage = Arrays.asList(nonexistentFilePath, 
existingFilePath);
+    String temporaryLocation = tmpFolder.newFolder().getAbsolutePath();
+
+    FileStagingOptions options = 
PipelineOptionsFactory.create().as(FileStagingOptions.class);
+    options.setFilesToStage(filesToStage);
+    options.setTempLocation(temporaryLocation);
+
+    assertThrows(
+        "To-be-staged file does not exist: ",
+        IllegalStateException.class,
+        () -> PipelineResources.prepareFilesForStaging(filesToStage, 
temporaryLocation));
+  }
+
+  @Test
+  public void testPackagingDirectoryResourceFromOptions() throws IOException {
+    String directoryPath = tmpFolder.newFolder().getAbsolutePath();
+    List<String> filesToStage = Arrays.asList(directoryPath);
+    String temporaryLocation = tmpFolder.newFolder().getAbsolutePath();
+
+    FileStagingOptions options = 
PipelineOptionsFactory.create().as(FileStagingOptions.class);
+    options.setFilesToStage(filesToStage);
+    options.setTempLocation(temporaryLocation);
+
+    PipelineResources.prepareFilesForStaging(options);
+    List<String> result = options.getFilesToStage();
+
+    assertTrue(new File(result.get(0)).exists());

Review comment:
       Should we also assert there is exactly one item in result?

##########
File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/resources/PipelineResources.java
##########
@@ -64,6 +70,31 @@
     return resourcePath -> !resourcePath.contains("gradle/wrapper");
   }
 
+  /**
+   * Goes through the list of files that need to be staged on runner. Removes 
nonexistent

Review comment:
       Don't we error on nonexistent directories, rather than removing them?

##########
File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/resources/PipelineResources.java
##########
@@ -64,6 +70,31 @@
     return resourcePath -> !resourcePath.contains("gradle/wrapper");
   }
 
+  /**
+   * Goes through the list of files that need to be staged on runner. Removes 
nonexistent
+   * directories and packages existing ones. This is necessary for runners 
that require filesToStage
+   * to be jars only.
+   *
+   * @param options options object with the files to stage and temp location 
for staging

Review comment:
       We should mention how this method can mutate `options`. Or we could 
instead return a list of prepared files as before so it's more explicit, like 
`options.setFilesToStage(PipelineResources.prepareFilesForStaging(options))`.

##########
File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/resources/PipelineResources.java
##########
@@ -64,6 +70,31 @@
     return resourcePath -> !resourcePath.contains("gradle/wrapper");
   }
 
+  /**
+   * Goes through the list of files that need to be staged on runner. Removes 
nonexistent
+   * directories and packages existing ones. This is necessary for runners 
that require filesToStage
+   * to be jars only.
+   *
+   * @param options options object with the files to stage and temp location 
for staging
+   */
+  public static void prepareFilesForStaging(FileStagingOptions options) {
+    List<String> filesToStage = options.getFilesToStage();
+    if (filesToStage == null || filesToStage.isEmpty()) {
+      filesToStage = 
detectClassPathResourcesToStage(ReflectHelpers.findClassLoader(), options);
+      LOG.info(
+          "PipelineOptions.filesToStage was not specified. "
+              + "Defaulting to files from the classpath: will stage {} files. "
+              + "Enable logging at DEBUG level to see which files will be 
staged.",
+          filesToStage.size());
+      LOG.debug("Classpath elements: {}", filesToStage);
+    }
+    options.setFilesToStage(
+        PipelineResources.prepareFilesForStaging(

Review comment:
       Can we make `prepareFilesForStaging(List<String>, String)` private, or 
maybe just merge the two methods?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to