Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4156#discussion_r124253131
  
    --- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
 ---
    @@ -255,4 +255,75 @@ class MemoryArchivist(
           graphs.remove(jobID)
         }
       }
    +
    +  /**
    +    * Checks and normalizes the archive path URI. This method first checks 
the validity of the
    +    * URI (scheme, path, availability of a matching file system) and then 
normalizes the URL
    +    * to a path.
    +    *
    +    * If the URI does not include an authority, but the file system 
configured for the URI has an
    +    * authority, then the normalized path will include this authority.
    +    *
    +    * @param archivePathUri The URI to check and normalize.
    +    * @return a normalized URI as a Path.
    +    *
    +    * @throws IllegalArgumentException Thrown, if the URI misses schema or 
path.
    +    * @throws IOException Thrown, if no file system can be found for the 
URI's scheme.
    +    */
    +  @throws[IOException]
    +  private def validateAndNormalizeUri(archivePathUri: URI): Path = {
    +    val scheme = archivePathUri.getScheme
    +    val path = archivePathUri.getPath
    +
    +    // some validity checks
    +    if (scheme == null) {
    +      throw new IllegalArgumentException("The scheme (hdfs://, file://, 
etc) is null. " +
    +        "Please specify the file system scheme explicitly in the URI.")
    +    }
    +
    +    if (path == null) {
    +      throw new IllegalArgumentException("The path to store the archive 
job is null. " +
    +        "Please specify a directory path for archive.")
    +    }
    +
    +    if (path.length == 0 || path == "/") {
    +      throw new IllegalArgumentException("Cannot use the root directory 
for archive.")
    +    }
    +    if (!FileSystem.isFlinkSupportedScheme(archivePathUri.getScheme)) {
    +      // skip verification checks for non-flink supported filesystem
    +      // this is because the required filesystem classes may not be 
available to the flink client
    +      new Path(archivePathUri)
    +    }
    +    else {
    --- End diff --
    
    you also just use `FileSystem.isFlinkSupportedScheme` which was used in the 
original method,


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to