[FLINK-7265] [core] Introduce FileSystemKind to differentiate between 
FileSystem and ObjectStore


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f29f8057
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f29f8057
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f29f8057

Branch: refs/heads/master
Commit: f29f80575dac1c7e59dd7074118953b8be26520f
Parents: 3edbb7b
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jul 25 17:19:25 2017 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Fri Nov 17 16:48:29 2017 +0100

----------------------------------------------------------------------
 .../org/apache/flink/core/fs/FileSystem.java    |   5 +
 .../apache/flink/core/fs/FileSystemKind.java    |  40 ++++++++
 .../core/fs/SafetyNetWrapperFileSystem.java     |   5 +
 .../flink/core/fs/local/LocalFileSystem.java    |  10 +-
 .../core/fs/local/LocalFileSystemTest.java      |   7 ++
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java |  47 +++++++++
 .../flink/runtime/fs/maprfs/MapRFileSystem.java |   6 ++
 .../flink/runtime/fs/hdfs/HdfsKindTest.java     | 101 +++++++++++++++++++
 8 files changed, 219 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f29f8057/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index d66a893..982e496 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -633,6 +633,11 @@ public abstract class FileSystem {
         */
        public abstract boolean isDistributedFS();
 
+       /**
+        * Gets a description of the characteristics of this file system.
+        */
+       public abstract FileSystemKind getKind();
+
        // 
------------------------------------------------------------------------
        //  output directory initialization
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f29f8057/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java
new file mode 100644
index 0000000..52f58ac
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * An enumeration defining the kind and characteristics of a {@link 
FileSystem}.
+ */
+@PublicEvolving
+public enum FileSystemKind {
+
+       /**
+        * An actual file system, with files and directories.
+        */
+       FILE_SYSTEM,
+
+       /**
+        * An Object store. Files correspond to objects.
+        * There are not really directories, but a directory-like structure may 
be mimicked
+        * by hierarchical naming of files.
+        */
+       OBJECT_STORE
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f29f8057/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
 
b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
index a1167dd..e7f43a4 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
@@ -141,6 +141,11 @@ public class SafetyNetWrapperFileSystem extends FileSystem 
implements WrappingPr
        }
 
        @Override
+       public FileSystemKind getKind() {
+               return unsafeFileSystem.getKind();
+       }
+
+       @Override
        public FileSystem getWrappedDelegate() {
                return unsafeFileSystem;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/f29f8057/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
index ecfd21c..a96f221 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
@@ -31,6 +31,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.OperatingSystem;
 
@@ -283,13 +284,18 @@ public class LocalFileSystem extends FileSystem {
                return false;
        }
 
+       @Override
+       public FileSystemKind getKind() {
+               return FileSystemKind.FILE_SYSTEM;
+       }
+
        // 
------------------------------------------------------------------------
 
        /**
         * Gets the URI that represents the local file system.
         * That URI is {@code "file:/"} on Windows platforms and {@code 
"file:///"} on other
         * UNIX family platforms.
-        * 
+        *
         * @return The URI that represents the local file system.
         */
        public static URI getLocalFsURI() {
@@ -298,7 +304,7 @@ public class LocalFileSystem extends FileSystem {
 
        /**
         * Gets the shared instance of this file system.
-        * 
+        *
         * @return The shared instance of this file system.
         */
        public static LocalFileSystem getSharedInstance() {

http://git-wip-us.apache.org/repos/asf/flink/blob/f29f8057/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
 
b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
index 2312ee9..96c5269 100644
--- 
a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.util.FileUtils;
@@ -312,4 +313,10 @@ public class LocalFileSystemTest {
                assertTrue(fs.rename(new Path(srcFolder.toURI()), new 
Path(dstFolder.toURI())));
                assertTrue(new File(dstFolder, srcFile.getName()).exists());
        }
+
+       @Test
+       public void testKind() {
+               final FileSystem fs = FileSystem.getLocalFileSystem();
+               assertEquals(FileSystemKind.FILE_SYSTEM, fs.getKind());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f29f8057/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git 
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
 
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index 5970c9d..7bc5a0f 100644
--- 
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ 
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -21,10 +21,12 @@ package org.apache.flink.runtime.fs.hdfs;
 import org.apache.flink.core.fs.BlockLocation;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.Path;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.Locale;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -36,6 +38,11 @@ public class HadoopFileSystem extends FileSystem {
        /** The wrapped Hadoop File System. */
        private final org.apache.hadoop.fs.FileSystem fs;
 
+       /* This field caches the file system kind. It is lazily set because the 
file system
+       * URL is lazily initialized. */
+       private FileSystemKind fsKind;
+
+
        /**
         * Wraps the given Hadoop File System object as a Flink File System 
object.
         * The given Hadoop file system object is expected to be initialized 
already.
@@ -168,4 +175,44 @@ public class HadoopFileSystem extends FileSystem {
        public boolean isDistributedFS() {
                return true;
        }
+
+       @Override
+       public FileSystemKind getKind() {
+               if (fsKind == null) {
+                       fsKind = getKindForScheme(this.fs.getUri().getScheme());
+               }
+               return fsKind;
+       }
+
+       /**
+        * Gets the kind of the file system from its scheme.
+        *
+        * <p>Implementation note: Initially, especially within the Flink 1.3.x 
line
+        * (in order to not break backwards compatibility), we must only label 
file systems
+        * as 'inconsistent' or as 'not proper filesystems' if we are sure 
about it.
+        * Otherwise, we cause regression for example in the performance and 
cleanup handling
+        * of checkpoints.
+        * For that reason, we initially mark some filesystems as 'eventually 
consistent' or
+        * as 'object stores', and leave the others as 'consistent file 
systems'.
+        */
+       static FileSystemKind getKindForScheme(String scheme) {
+               scheme = scheme.toLowerCase(Locale.US);
+
+               if (scheme.startsWith("s3") || scheme.startsWith("emr")) {
+                       // the Amazon S3 storage
+                       return FileSystemKind.OBJECT_STORE;
+               }
+               else if (scheme.startsWith("http") || scheme.startsWith("ftp")) 
{
+                       // file servers instead of file systems
+                       // they might actually be consistent, but we have no 
hard guarantees
+                       // currently to rely on that
+                       return FileSystemKind.OBJECT_STORE;
+               }
+               else {
+                       // the remainder should include hdfs, kosmos, ceph, ...
+                       // this also includes federated HDFS (viewfs).
+                       return FileSystemKind.FILE_SYSTEM;
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f29f8057/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
----------------------------------------------------------------------
diff --git 
a/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
 
b/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
index 058772c..5aec4a4 100644
--- 
a/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
+++ 
b/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.fs.maprfs;
 
+import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 
 import org.slf4j.Logger;
@@ -172,4 +173,9 @@ public class MapRFileSystem extends HadoopFileSystem {
                throw new IOException(String.format(
                                "Unable to find CLDB locations for cluster %s", 
authority));
        }
+
+       @Override
+       public FileSystemKind getKind() {
+               return FileSystemKind.FILE_SYSTEM;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f29f8057/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java
 
b/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java
new file mode 100644
index 0000000..69ecdb8
--- /dev/null
+++ 
b/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for extracting the {@link FileSystemKind} from file systems that Flink
+ * accesses through Hadoop's File System interface.
+ *
+ * <p>This class needs to be in this package, because it accesses package 
private methods
+ * from the HDFS file system wrapper class.
+ */
+public class HdfsKindTest extends TestLogger {
+
+       @Test
+       public void testHdfsKind() throws IOException {
+               final FileSystem fs = new 
Path("hdfs://localhost:55445/my/file").getFileSystem();
+               assertEquals(FileSystemKind.FILE_SYSTEM, fs.getKind());
+       }
+
+       @Test
+       public void testS3Kind() throws IOException {
+               try {
+                       Class.forName("org.apache.hadoop.fs.s3.S3FileSystem");
+               } catch (ClassNotFoundException ignored) {
+                       // not in the classpath, cannot run this test
+                       log.info("Skipping test 'testS3Kind()' because the S3 
file system is not in the class path");
+                       return;
+               }
+
+               final FileSystem s3 = new 
Path("s3://myId:mySecret@bucket/some/bucket/some/object").getFileSystem();
+               assertEquals(FileSystemKind.OBJECT_STORE, s3.getKind());
+       }
+
+       @Test
+       public void testS3nKind() throws IOException {
+               try {
+                       
Class.forName("org.apache.hadoop.fs.s3native.NativeS3FileSystem");
+               } catch (ClassNotFoundException ignored) {
+                       // not in the classpath, cannot run this test
+                       log.info("Skipping test 'testS3nKind()' because the 
Native S3 file system is not in the class path");
+                       return;
+               }
+
+               final FileSystem s3n = new 
Path("s3n://myId:mySecret@bucket/some/bucket/some/object").getFileSystem();
+               assertEquals(FileSystemKind.OBJECT_STORE, s3n.getKind());
+       }
+
+       @Test
+       public void testS3aKind() throws IOException {
+               try {
+                       Class.forName("org.apache.hadoop.fs.s3a.S3AFileSystem");
+               } catch (ClassNotFoundException ignored) {
+                       // not in the classpath, cannot run this test
+                       log.info("Skipping test 'testS3aKind()' because the 
S3AFileSystem is not in the class path");
+                       return;
+               }
+
+               final FileSystem s3a = new 
Path("s3a://myId:mySecret@bucket/some/bucket/some/object").getFileSystem();
+               assertEquals(FileSystemKind.OBJECT_STORE, s3a.getKind());
+       }
+
+       @Test
+       public void testS3fileSystemSchemes() {
+               assertEquals(FileSystemKind.OBJECT_STORE, 
HadoopFileSystem.getKindForScheme("s3"));
+               assertEquals(FileSystemKind.OBJECT_STORE, 
HadoopFileSystem.getKindForScheme("s3n"));
+               assertEquals(FileSystemKind.OBJECT_STORE, 
HadoopFileSystem.getKindForScheme("s3a"));
+               assertEquals(FileSystemKind.OBJECT_STORE, 
HadoopFileSystem.getKindForScheme("EMRFS"));
+       }
+
+       @Test
+       public void testViewFs() {
+               assertEquals(FileSystemKind.FILE_SYSTEM, 
HadoopFileSystem.getKindForScheme("viewfs"));
+       }
+}

Reply via email to