Github user zhangminglei commented on a diff in the pull request:
https://github.com/apache/flink/pull/4156#discussion_r124251147
--- Diff:
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
---
@@ -255,4 +255,75 @@ class MemoryArchivist(
graphs.remove(jobID)
}
}
+
+ /**
+ * Checks and normalizes the archive path URI. This method first checks
the validity of the
+ * URI (scheme, path, availability of a matching file system) and then
normalizes the URL
+ * to a path.
+ *
+ * If the URI does not include an authority, but the file system
configured for the URI has an
+ * authority, then the normalized path will include this authority.
+ *
+ * @param archivePathUri The URI to check and normalize.
+ * @return a normalized URI as a Path.
+ *
+ * @throws IllegalArgumentException Thrown, if the URI misses schema or
path.
+ * @throws IOException Thrown, if no file system can be found for the
URI's scheme.
+ */
+ @throws[IOException]
+ private def validateAndNormalizeUri(archivePathUri: URI): Path = {
+ val scheme = archivePathUri.getScheme
+ val path = archivePathUri.getPath
+
+ // some validity checks
+ if (scheme == null) {
+ throw new IllegalArgumentException("The scheme (hdfs://, file://,
etc) is null. " +
+ "Please specify the file system scheme explicitly in the URI.")
+ }
+
+ if (path == null) {
+ throw new IllegalArgumentException("The path to store the archive
job is null. " +
+ "Please specify a directory path for archive.")
+ }
+
+ if (path.length == 0 || path == "/") {
+ throw new IllegalArgumentException("Cannot use the root directory
for archive.")
+ }
+ if (!FileSystem.isFlinkSupportedScheme(archivePathUri.getScheme)) {
+ // skip verification checks for non-flink supported filesystem
+ // this is because the required filesystem classes may not be
available to the flink client
+ new Path(archivePathUri)
+ }
+ else {
--- End diff --
I am not very sure what kind of scheme that flink support. For example, I
just know existing scheme from what those codes say ```hdfs``` or ```file```.
If I know what kinda scheme that flink support, I probably fix the code like
```if (scheme.startsWith("hdfs") or scheme.startsWith("file"))``` or something
like that. Please helps check the newest code again and give some advices. :XD
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---