1996fanrui commented on code in PR #23219:
URL: https://github.com/apache/flink/pull/23219#discussion_r1305075128


##########
flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java:
##########
@@ -599,8 +638,13 @@ private void testEnvironmentDirectoryShipping(String 
environmentVariable, boolea
             }
 
             // only add the ship the folder, not the contents
-            
assertThat(effectiveShipFiles).doesNotContain(libFile).contains(libFolder);
-            assertThat(descriptor.getShipFiles()).doesNotContain(libFile, 
libFolder);
+            assertThat(effectiveShipFiles)
+                    .doesNotContain(new Path(libFile.getAbsolutePath()))
+                    .contains(new Path(libFolder.toURI()));
+            assertThat(descriptor.getShipFiles())
+                    .doesNotContain(
+                            new Path(libFile.getAbsolutePath()),
+                            new Path(libFolder.getAbsolutePath()));

Review Comment:
   Some of files are converted to `Path` by the `toURI`, and some of them by 
the `.getAbsolutePath()`, is it as expected?
   
   I prefer to introduce a method that similar to 
`getPathFromLocalFilePathStr`, we need a `getPathFromLocalFile`. Otherwise the 
test code will be unclear.
   
   Also, the `YarnClusterDescriptor` also needs the `getPathFromLocalFile`. 
Maybe we should extract a couple of static method at an util class.



##########
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java:
##########
@@ -155,10 +157,19 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
     /** True if the descriptor must not shut down the YarnClient. */
     private final boolean sharedYarnClient;
 
-    /** Lazily initialized list of files to ship. */
-    private final List<File> shipFiles = new LinkedList<>();
+    /**
+     * Lazily initialized list of files to ship. The path string for the files 
which is configured
+     * by {@code YarnConfigOptions#SHIP_FILES} will be converted to {@code 
Path} with schema and

Review Comment:
   ```suggestion
        * by {@link YarnConfigOptions#SHIP_FILES} will be converted to {@link 
Path} with schema and
   ```
   
   It should be link instead of code, the link can be clicked in IDEA. 
   
   The `shipArchives` is same.



##########
flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java:
##########
@@ -707,6 +751,25 @@ void testShipArchives() throws IOException {
                 YarnConfigOptions.SHIP_ARCHIVES,
                 Arrays.asList(archive1.getAbsolutePath(), 
archive2.getAbsolutePath()));
         createYarnClusterDescriptor(flinkConfiguration);
+
+        String archive3 = "hdfs:///flink/archive3.zip";
+        final org.apache.hadoop.conf.Configuration hdConf =
+                new org.apache.hadoop.conf.Configuration();
+        hdConf.set(
+                MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
temporaryFolder.toAbsolutePath().toString());
+        try (final MiniDFSCluster hdfsCluster = new 
MiniDFSCluster.Builder(hdConf).build()) {
+            final org.apache.hadoop.fs.Path hdfsRootPath =
+                    new org.apache.hadoop.fs.Path(hdfsCluster.getURI());
+            hdfsCluster.getFileSystem().createNewFile(new 
org.apache.hadoop.fs.Path(archive3));
+
+            flinkConfiguration.set(
+                    YarnConfigOptions.SHIP_ARCHIVES,
+                    Arrays.asList(archive1.getAbsolutePath(), archive3));
+            final YarnConfiguration yarnConfig = new YarnConfiguration();
+            yarnConfig.set(
+                    CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, 
hdfsRootPath.toString());
+            createYarnClusterDescriptor(flinkConfiguration, yarnConfig);

Review Comment:
   Could you add a assert to check whether the descriptor.getShipFiles() is as 
expected?



##########
flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java:
##########
@@ -543,22 +545,59 @@ void testExplicitFileShipping() throws Exception {
                     Files.createTempDirectory(temporaryFolder, 
UUID.randomUUID().toString())
                             .toFile();
 
-            assertThat(descriptor.getShipFiles()).doesNotContain(libFile, 
libFolder);
+            assertThat(descriptor.getShipFiles())
+                    .doesNotContain(
+                            new Path(libFile.getAbsolutePath()),
+                            new Path(libFolder.getAbsolutePath()));
 
-            List<File> shipFiles = new ArrayList<>();
-            shipFiles.add(libFile);
-            shipFiles.add(libFolder);
+            List<Path> shipFiles = new ArrayList<>();
+            shipFiles.add(new Path(libFile.getAbsolutePath()));
+            shipFiles.add(new Path(libFolder.getAbsolutePath()));
 
             descriptor.addShipFiles(shipFiles);
 
-            assertThat(descriptor.getShipFiles()).contains(libFile, libFolder);
+            assertThat(descriptor.getShipFiles())
+                    .contains(
+                            new Path(libFile.getAbsolutePath()),
+                            new Path(libFolder.getAbsolutePath()));
 
             // only execute part of the deployment to test for shipped files
-            Set<File> effectiveShipFiles = new HashSet<>();
+            Set<Path> effectiveShipFiles = new HashSet<>();
             descriptor.addLibFoldersToShipFiles(effectiveShipFiles);
 
             assertThat(effectiveShipFiles).isEmpty();
-            assertThat(descriptor.getShipFiles()).hasSize(2).contains(libFile, 
libFolder);
+            assertThat(descriptor.getShipFiles())
+                    .hasSize(2)
+                    .contains(
+                            new Path(libFile.getAbsolutePath()),
+                            new Path(libFolder.getAbsolutePath()));
+
+            String hdfsDir = "hdfs:///flink/hdfs_dir";
+            String hdfsFile = "hdfs:///flink/hdfs_file";
+            final org.apache.hadoop.conf.Configuration hdConf =
+                    new org.apache.hadoop.conf.Configuration();
+            hdConf.set(
+                    MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
+                    temporaryFolder.toAbsolutePath().toString());
+            try (final MiniDFSCluster hdfsCluster = new 
MiniDFSCluster.Builder(hdConf).build()) {
+                final org.apache.hadoop.fs.Path hdfsRootPath =
+                        new org.apache.hadoop.fs.Path(hdfsCluster.getURI());
+                hdfsCluster.getFileSystem().mkdirs(new 
org.apache.hadoop.fs.Path(hdfsDir));
+                hdfsCluster.getFileSystem().createNewFile(new 
org.apache.hadoop.fs.Path(hdfsFile));
+
+                Configuration flinkConfiguration = new Configuration();
+                flinkConfiguration.set(
+                        YarnConfigOptions.SHIP_FILES,
+                        Arrays.asList(
+                                libFile.getAbsolutePath(),
+                                libFolder.getAbsolutePath(),
+                                hdfsDir,
+                                hdfsFile));
+                final YarnConfiguration yarnConfig = new YarnConfiguration();
+                yarnConfig.set(
+                        CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, 
hdfsRootPath.toString());
+                createYarnClusterDescriptor(flinkConfiguration, yarnConfig);
+            }

Review Comment:
   First of all, this `createYarnClusterDescriptor(flinkConfiguration, 
yarnConfig);` is inside of a try-with-resource block, and it has a 
`YarnClusterDescriptor descriptor`. So your 
`createYarnClusterDescriptor(flinkConfiguration, yarnConfig);` should be out of 
this `try`.
   
   And could you add a assert to check whether the `descriptor.getShipFiles()` 
is as expected?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to