venkata91 commented on code in PR #23164:
URL: https://github.com/apache/flink/pull/23164#discussion_r1291641821
##########
flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java:
##########
@@ -360,11 +362,27 @@ List<String> registerProvidedLocalResources() {
envShipResourceList.add(descriptor);
if (!isFlinkDistJar(filePath.getName()) &&
!isPlugin(filePath)) {
- classPaths.add(fileName);
+ URI parentDirectoryUri = new
Path(fileName).getParent().toUri();
+ String relativeParentDirectory =
+ new Path(filePath.getName())
+ .toUri()
+ .relativize(parentDirectoryUri)
+ .toString();
+
+ if (!resourcesDir.contains(relativeParentDirectory)) {
+ resourcesDir.add(relativeParentDirectory);
+ }
+ resources.add(fileName);
} else if (isFlinkDistJar(filePath.getName())) {
flinkDist = descriptor;
}
});
+
+ // Construct classpath where resource directories go first followed
+ // by resource files. We also sort both resources and resource
directories in
Review Comment:
nit: instead of `We also sort both resources and resource directories in` ->
`Sort both resources and resource directories in`
##########
flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationFileUploaderTest.java:
##########
@@ -73,6 +74,38 @@ void testRegisterProvidedLocalResources(@TempDir File
flinkLibDir) throws IOExce
}
}
+ @Test
+ void testRegisterProvidedLocalResourcesWithParentDir(@TempDir File
flinkLibDir)
+ throws IOException {
+ final Map<String, String> filesWithParentDir = getFilesWithParentDir();
+
+ generateFilesInDirectory(flinkLibDir, filesWithParentDir);
+
+ try (final YarnApplicationFileUploader yarnApplicationFileUploader =
+ YarnApplicationFileUploader.from(
+ FileSystem.get(new YarnConfiguration()),
+ new Path(flinkLibDir.toURI()),
+ Collections.singletonList(new
Path(flinkLibDir.toURI())),
+ ApplicationId.newInstance(0, 0),
+ DFSConfigKeys.DFS_REPLICATION_DEFAULT)) {
+
+ List<String> classPath =
yarnApplicationFileUploader.registerProvidedLocalResources();
+
+ Set<String> expectedClassPathEntries = new HashSet<>();
+ Set<String> resources = new HashSet<>();
+ Set<String> resourcesDir = new HashSet<>();
+ for (String filePath : filesWithParentDir.keySet()) {
+ String parentDir = new Path(filePath).getParent().toString();
+ resourcesDir.add(parentDir);
+ resources.add(filePath);
+ }
+
resourcesDir.stream().sorted().forEach(expectedClassPathEntries::add);
+ resources.stream().sorted().forEach(expectedClassPathEntries::add);
+
+
assertThat(classPath).containsExactlyInAnyOrderElementsOf(expectedClassPathEntries);
Review Comment:
how are we ensuring that the classpath is in expected order? `Set` doesn't
guarantee ordering therefore `expectedClassPathEntries` should be
`List<String>` instead of `Set<String>`. Also I prefer to manually construct
the `expectedClassPathEntries` rather than programmatically constructing it for
stronger validation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]