1996fanrui commented on code in PR #23219: URL: https://github.com/apache/flink/pull/23219#discussion_r1305075128
########## flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java: ########## @@ -599,8 +638,13 @@ private void testEnvironmentDirectoryShipping(String environmentVariable, boolea } // only add the ship the folder, not the contents - assertThat(effectiveShipFiles).doesNotContain(libFile).contains(libFolder); - assertThat(descriptor.getShipFiles()).doesNotContain(libFile, libFolder); + assertThat(effectiveShipFiles) + .doesNotContain(new Path(libFile.getAbsolutePath())) + .contains(new Path(libFolder.toURI())); + assertThat(descriptor.getShipFiles()) + .doesNotContain( + new Path(libFile.getAbsolutePath()), + new Path(libFolder.getAbsolutePath())); Review Comment: Some of files are converted to `Path` by the `toURI`, and some of them by the `.getAbsolutePath()`, is it as expected? I prefer to introduce a method that similar to `getPathFromLocalFilePathStr`, we need a `getPathFromLocalFile`. Otherwise the test code will be unclear. Also, the `YarnClusterDescriptor` also needs the `getPathFromLocalFile`. Maybe we should extract a couple of static method at an util class. ########## flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java: ########## @@ -155,10 +157,19 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { /** True if the descriptor must not shut down the YarnClient. */ private final boolean sharedYarnClient; - /** Lazily initialized list of files to ship. */ - private final List<File> shipFiles = new LinkedList<>(); + /** + * Lazily initialized list of files to ship. The path string for the files which is configured + * by {@code YarnConfigOptions#SHIP_FILES} will be converted to {@code Path} with schema and Review Comment: ```suggestion * by {@link YarnConfigOptions#SHIP_FILES} will be converted to {@link Path} with schema and ``` It should be link instead of code, the link can be clicked in IDEA. The `shipArchives` is same. ########## flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java: ########## @@ -707,6 +751,25 @@ void testShipArchives() throws IOException { YarnConfigOptions.SHIP_ARCHIVES, Arrays.asList(archive1.getAbsolutePath(), archive2.getAbsolutePath())); createYarnClusterDescriptor(flinkConfiguration); + + String archive3 = "hdfs:///flink/archive3.zip"; + final org.apache.hadoop.conf.Configuration hdConf = + new org.apache.hadoop.conf.Configuration(); + hdConf.set( + MiniDFSCluster.HDFS_MINIDFS_BASEDIR, temporaryFolder.toAbsolutePath().toString()); + try (final MiniDFSCluster hdfsCluster = new MiniDFSCluster.Builder(hdConf).build()) { + final org.apache.hadoop.fs.Path hdfsRootPath = + new org.apache.hadoop.fs.Path(hdfsCluster.getURI()); + hdfsCluster.getFileSystem().createNewFile(new org.apache.hadoop.fs.Path(archive3)); + + flinkConfiguration.set( + YarnConfigOptions.SHIP_ARCHIVES, + Arrays.asList(archive1.getAbsolutePath(), archive3)); + final YarnConfiguration yarnConfig = new YarnConfiguration(); + yarnConfig.set( + CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, hdfsRootPath.toString()); + createYarnClusterDescriptor(flinkConfiguration, yarnConfig); Review Comment: Could you add a assert to check whether the descriptor.getShipFiles() is as expected? ########## flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java: ########## @@ -543,22 +545,59 @@ void testExplicitFileShipping() throws Exception { Files.createTempDirectory(temporaryFolder, UUID.randomUUID().toString()) .toFile(); - assertThat(descriptor.getShipFiles()).doesNotContain(libFile, libFolder); + assertThat(descriptor.getShipFiles()) + .doesNotContain( + new Path(libFile.getAbsolutePath()), + new Path(libFolder.getAbsolutePath())); - List<File> shipFiles = new ArrayList<>(); - shipFiles.add(libFile); - shipFiles.add(libFolder); + List<Path> shipFiles = new ArrayList<>(); + shipFiles.add(new Path(libFile.getAbsolutePath())); + shipFiles.add(new Path(libFolder.getAbsolutePath())); descriptor.addShipFiles(shipFiles); - assertThat(descriptor.getShipFiles()).contains(libFile, libFolder); + assertThat(descriptor.getShipFiles()) + .contains( + new Path(libFile.getAbsolutePath()), + new Path(libFolder.getAbsolutePath())); // only execute part of the deployment to test for shipped files - Set<File> effectiveShipFiles = new HashSet<>(); + Set<Path> effectiveShipFiles = new HashSet<>(); descriptor.addLibFoldersToShipFiles(effectiveShipFiles); assertThat(effectiveShipFiles).isEmpty(); - assertThat(descriptor.getShipFiles()).hasSize(2).contains(libFile, libFolder); + assertThat(descriptor.getShipFiles()) + .hasSize(2) + .contains( + new Path(libFile.getAbsolutePath()), + new Path(libFolder.getAbsolutePath())); + + String hdfsDir = "hdfs:///flink/hdfs_dir"; + String hdfsFile = "hdfs:///flink/hdfs_file"; + final org.apache.hadoop.conf.Configuration hdConf = + new org.apache.hadoop.conf.Configuration(); + hdConf.set( + MiniDFSCluster.HDFS_MINIDFS_BASEDIR, + temporaryFolder.toAbsolutePath().toString()); + try (final MiniDFSCluster hdfsCluster = new MiniDFSCluster.Builder(hdConf).build()) { + final org.apache.hadoop.fs.Path hdfsRootPath = + new org.apache.hadoop.fs.Path(hdfsCluster.getURI()); + hdfsCluster.getFileSystem().mkdirs(new org.apache.hadoop.fs.Path(hdfsDir)); + hdfsCluster.getFileSystem().createNewFile(new org.apache.hadoop.fs.Path(hdfsFile)); + + Configuration flinkConfiguration = new Configuration(); + flinkConfiguration.set( + YarnConfigOptions.SHIP_FILES, + Arrays.asList( + libFile.getAbsolutePath(), + libFolder.getAbsolutePath(), + hdfsDir, + hdfsFile)); + final YarnConfiguration yarnConfig = new YarnConfiguration(); + yarnConfig.set( + CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, hdfsRootPath.toString()); + createYarnClusterDescriptor(flinkConfiguration, yarnConfig); + } Review Comment: First of all, this `createYarnClusterDescriptor(flinkConfiguration, yarnConfig);` is inside of a try-with-resource block, and it has a `YarnClusterDescriptor descriptor`. So your `createYarnClusterDescriptor(flinkConfiguration, yarnConfig);` should be out of this `try`. And could you add a assert to check whether the `descriptor.getShipFiles()` is as expected? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org