Github user zhangminglei commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4156#discussion_r124272868
  
    --- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
 ---
    @@ -255,4 +255,47 @@ class MemoryArchivist(
           graphs.remove(jobID)
         }
       }
    +
    +  /**
    +    * Checks and normalizes the archive path URI. This method first checks 
the validity of the
    +    * URI (scheme, path, availability of a matching file system) and then 
normalizes the URL
    +    * to a path.
    +    *
    +    * If the URI does not include an authority, but the file system 
configured for the URI has an
    +    * authority, then the normalized path will include this authority.
    +    *
    +    * @param archivePathUri The URI to check and normalize.
    +    * @return a normalized URI as a Path.
    +    *
    +    * @throws IllegalArgumentException Thrown, if the URI misses schema or 
path.
    +    * @throws IOException Thrown, if no file system can be found for the 
URI's scheme.
    +    */
    +  @throws[IOException]
    +  private def validateAndNormalizeUri(archivePathUri: URI): Path = {
    +    val scheme = archivePathUri.getScheme
    +    val path = archivePathUri.getPath
    +
    +    // some validity checks
    +    if (scheme == null) {
    +      throw new IllegalArgumentException("The scheme (hdfs://, file://, 
etc) is null. " +
    +        "Please specify the file system scheme explicitly in the URI: " + 
archivePathUri)
    +    }
    +
    +    if (path == null) {
    +      throw new IllegalArgumentException("The path to store the job 
archives is null. " +
    +        "Please specify a directory path for storing job archives. and the 
URI is: " + archivePathUri)
    +    }
    +
    +    if (path.length == 0 || path == "/") {
    +      throw new IllegalArgumentException("Cannot use the root directory 
for storing job archives.")
    +    }
    +
    +    if (!FileSystem.isFlinkSupportedScheme(archivePathUri.getScheme)) {
    +      // skip verification checks for non-flink supported filesystem
    +      // this is because the required filesystem classes may not be 
available to the flink client
    +      throw new IllegalArgumentException("Cannot use the " + 
archivePathUri.getScheme + " scheme, only hdfs, " +
    --- End diff --
    
    :D I have updated the code. Thanks @zentol VERY careful review. I am very 
appreciate it!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to