1996fanrui commented on code in PR #23219:
URL: https://github.com/apache/flink/pull/23219#discussion_r1302400685
##########
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java:
##########
@@ -156,9 +158,9 @@ public class YarnClusterDescriptor implements
ClusterDescriptor<ApplicationId> {
private final boolean sharedYarnClient;
/** Lazily initialized list of files to ship. */
- private final List<File> shipFiles = new LinkedList<>();
+ private final List<Path> shipFiles = new LinkedList<>();
- private final List<File> shipArchives = new LinkedList<>();
+ private final List<Path> shipArchives = new LinkedList<>();
Review Comment:
How about adding a comment to describe we have converted the option path str
to the Path with schema and absolute path? It's more clear for other
developers, and it's clear to use them.
##########
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java:
##########
@@ -272,17 +272,25 @@ public class YarnConfigOptions {
.noDefaultValue()
.withDeprecatedKeys("yarn.ship-directories")
.withDescription(
- "A semicolon-separated list of files and/or
directories to be shipped to the YARN cluster.");
+ "A semicolon-separated list of files and/or
directories to be shipped to the YARN "
+ + "cluster. These files/directories can
come from the local path of flink client "
+ + "or HDFS. For example, "
Review Comment:
How about updating `from the local client and/or HDFS` to `the local path of
flink client or HDFS` as well?
##########
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java:
##########
@@ -202,16 +204,27 @@ public YarnClusterDescriptor(
this.nodeLabel =
flinkConfiguration.getString(YarnConfigOptions.NODE_LABEL);
}
- private Optional<List<File>> decodeFilesToShipToCluster(
+ private Optional<List<Path>> decodeFilesToShipToCluster(
final Configuration configuration, final
ConfigOption<List<String>> configOption) {
checkNotNull(configuration);
checkNotNull(configOption);
- final List<File> files =
- ConfigUtils.decodeListFromConfig(configuration, configOption,
File::new);
+ List<Path> files = ConfigUtils.decodeListFromConfig(configuration,
configOption, Path::new);
+ files =
files.stream().map(this::enrichPathSchemaIfNeeded).collect(Collectors.toList());
return files.isEmpty() ? Optional.empty() : Optional.of(files);
}
+ private Path enrichPathSchemaIfNeeded(Path path) {
+ if (isWithoutSchema(path)) {
+ return new Path(new File(path.toString()).toURI());
Review Comment:
This class has a couple of ` new Path(new File(pathStr).toURI())` to convert
path from `localPathStr` to the hdfs `Path`, could we extract one method to do
it?
##########
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java:
##########
@@ -202,16 +204,27 @@ public YarnClusterDescriptor(
this.nodeLabel =
flinkConfiguration.getString(YarnConfigOptions.NODE_LABEL);
}
- private Optional<List<File>> decodeFilesToShipToCluster(
+ private Optional<List<Path>> decodeFilesToShipToCluster(
final Configuration configuration, final
ConfigOption<List<String>> configOption) {
checkNotNull(configuration);
checkNotNull(configOption);
- final List<File> files =
- ConfigUtils.decodeListFromConfig(configuration, configOption,
File::new);
+ List<Path> files = ConfigUtils.decodeListFromConfig(configuration,
configOption, Path::new);
+ files =
files.stream().map(this::enrichPathSchemaIfNeeded).collect(Collectors.toList());
return files.isEmpty() ? Optional.empty() : Optional.of(files);
}
+ private Path enrichPathSchemaIfNeeded(Path path) {
+ if (isWithoutSchema(path)) {
+ return new Path(new File(path.toString()).toURI());
+ }
+ return path;
+ }
+
+ private boolean isWithoutSchema(Path path) {
+ return StringUtils.isNullOrWhitespaceOnly(path.toUri().getScheme());
+ }
Review Comment:
How about updating the `Path enrichPathSchemaIfNeeded(Path path)` to the
`Path createPathWithSchema(String path)`?
If so, the `files =
files.stream().map(this::enrichPathSchemaIfNeeded).collect(Collectors.toList());`
can be removed. We just call `List<Path> files =
ConfigUtils.decodeListFromConfig(configuration, configOption,
this::createPathWithSchema);`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]