[FLINK-7265] [core] Introduce FileSystemKind to differentiate between FileSystem and ObjectStore
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f29f8057 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f29f8057 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f29f8057 Branch: refs/heads/master Commit: f29f80575dac1c7e59dd7074118953b8be26520f Parents: 3edbb7b Author: Stephan Ewen <se...@apache.org> Authored: Tue Jul 25 17:19:25 2017 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Fri Nov 17 16:48:29 2017 +0100 ---------------------------------------------------------------------- .../org/apache/flink/core/fs/FileSystem.java | 5 + .../apache/flink/core/fs/FileSystemKind.java | 40 ++++++++ .../core/fs/SafetyNetWrapperFileSystem.java | 5 + .../flink/core/fs/local/LocalFileSystem.java | 10 +- .../core/fs/local/LocalFileSystemTest.java | 7 ++ .../flink/runtime/fs/hdfs/HadoopFileSystem.java | 47 +++++++++ .../flink/runtime/fs/maprfs/MapRFileSystem.java | 6 ++ .../flink/runtime/fs/hdfs/HdfsKindTest.java | 101 +++++++++++++++++++ 8 files changed, 219 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f29f8057/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java index d66a893..982e496 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java @@ -633,6 +633,11 @@ public abstract class FileSystem { */ public abstract boolean isDistributedFS(); + /** + * Gets a description of the characteristics of this file system. + */ + public abstract FileSystemKind getKind(); + // ------------------------------------------------------------------------ // output directory initialization // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/f29f8057/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java new file mode 100644 index 0000000..52f58ac --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * An enumeration defining the kind and characteristics of a {@link FileSystem}. + */ +@PublicEvolving +public enum FileSystemKind { + + /** + * An actual file system, with files and directories. + */ + FILE_SYSTEM, + + /** + * An Object store. Files correspond to objects. + * There are not really directories, but a directory-like structure may be mimicked + * by hierarchical naming of files. + */ + OBJECT_STORE +} http://git-wip-us.apache.org/repos/asf/flink/blob/f29f8057/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java index a1167dd..e7f43a4 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java @@ -141,6 +141,11 @@ public class SafetyNetWrapperFileSystem extends FileSystem implements WrappingPr } @Override + public FileSystemKind getKind() { + return unsafeFileSystem.getKind(); + } + + @Override public FileSystem getWrappedDelegate() { return unsafeFileSystem; } http://git-wip-us.apache.org/repos/asf/flink/blob/f29f8057/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java index ecfd21c..a96f221 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java @@ -31,6 +31,7 @@ import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemKind; import org.apache.flink.core.fs.Path; import org.apache.flink.util.OperatingSystem; @@ -283,13 +284,18 @@ public class LocalFileSystem extends FileSystem { return false; } + @Override + public FileSystemKind getKind() { + return FileSystemKind.FILE_SYSTEM; + } + // ------------------------------------------------------------------------ /** * Gets the URI that represents the local file system. * That URI is {@code "file:/"} on Windows platforms and {@code "file:///"} on other * UNIX family platforms. - * + * * @return The URI that represents the local file system. */ public static URI getLocalFsURI() { @@ -298,7 +304,7 @@ public class LocalFileSystem extends FileSystem { /** * Gets the shared instance of this file system. - * + * * @return The shared instance of this file system. */ public static LocalFileSystem getSharedInstance() { http://git-wip-us.apache.org/repos/asf/flink/blob/f29f8057/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java index 2312ee9..96c5269 100644 --- a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java +++ b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java @@ -34,6 +34,7 @@ import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemKind; import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.util.FileUtils; @@ -312,4 +313,10 @@ public class LocalFileSystemTest { assertTrue(fs.rename(new Path(srcFolder.toURI()), new Path(dstFolder.toURI()))); assertTrue(new File(dstFolder, srcFile.getName()).exists()); } + + @Test + public void testKind() { + final FileSystem fs = FileSystem.getLocalFileSystem(); + assertEquals(FileSystemKind.FILE_SYSTEM, fs.getKind()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/f29f8057/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java index 5970c9d..7bc5a0f 100644 --- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java +++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java @@ -21,10 +21,12 @@ package org.apache.flink.runtime.fs.hdfs; import org.apache.flink.core.fs.BlockLocation; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemKind; import org.apache.flink.core.fs.Path; import java.io.IOException; import java.net.URI; +import java.util.Locale; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -36,6 +38,11 @@ public class HadoopFileSystem extends FileSystem { /** The wrapped Hadoop File System. */ private final org.apache.hadoop.fs.FileSystem fs; + /* This field caches the file system kind. It is lazily set because the file system + * URL is lazily initialized. */ + private FileSystemKind fsKind; + + /** * Wraps the given Hadoop File System object as a Flink File System object. * The given Hadoop file system object is expected to be initialized already. @@ -168,4 +175,44 @@ public class HadoopFileSystem extends FileSystem { public boolean isDistributedFS() { return true; } + + @Override + public FileSystemKind getKind() { + if (fsKind == null) { + fsKind = getKindForScheme(this.fs.getUri().getScheme()); + } + return fsKind; + } + + /** + * Gets the kind of the file system from its scheme. + * + * <p>Implementation note: Initially, especially within the Flink 1.3.x line + * (in order to not break backwards compatibility), we must only label file systems + * as 'inconsistent' or as 'not proper filesystems' if we are sure about it. + * Otherwise, we cause regression for example in the performance and cleanup handling + * of checkpoints. + * For that reason, we initially mark some filesystems as 'eventually consistent' or + * as 'object stores', and leave the others as 'consistent file systems'. + */ + static FileSystemKind getKindForScheme(String scheme) { + scheme = scheme.toLowerCase(Locale.US); + + if (scheme.startsWith("s3") || scheme.startsWith("emr")) { + // the Amazon S3 storage + return FileSystemKind.OBJECT_STORE; + } + else if (scheme.startsWith("http") || scheme.startsWith("ftp")) { + // file servers instead of file systems + // they might actually be consistent, but we have no hard guarantees + // currently to rely on that + return FileSystemKind.OBJECT_STORE; + } + else { + // the remainder should include hdfs, kosmos, ceph, ... + // this also includes federated HDFS (viewfs). + return FileSystemKind.FILE_SYSTEM; + } + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/f29f8057/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java ---------------------------------------------------------------------- diff --git a/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java b/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java index 058772c..5aec4a4 100644 --- a/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java +++ b/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.fs.maprfs; +import org.apache.flink.core.fs.FileSystemKind; import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; import org.slf4j.Logger; @@ -172,4 +173,9 @@ public class MapRFileSystem extends HadoopFileSystem { throw new IOException(String.format( "Unable to find CLDB locations for cluster %s", authority)); } + + @Override + public FileSystemKind getKind() { + return FileSystemKind.FILE_SYSTEM; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/f29f8057/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java b/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java new file mode 100644 index 0000000..69ecdb8 --- /dev/null +++ b/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.fs.hdfs; + +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemKind; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for extracting the {@link FileSystemKind} from file systems that Flink + * accesses through Hadoop's File System interface. + * + * <p>This class needs to be in this package, because it accesses package private methods + * from the HDFS file system wrapper class. + */ +public class HdfsKindTest extends TestLogger { + + @Test + public void testHdfsKind() throws IOException { + final FileSystem fs = new Path("hdfs://localhost:55445/my/file").getFileSystem(); + assertEquals(FileSystemKind.FILE_SYSTEM, fs.getKind()); + } + + @Test + public void testS3Kind() throws IOException { + try { + Class.forName("org.apache.hadoop.fs.s3.S3FileSystem"); + } catch (ClassNotFoundException ignored) { + // not in the classpath, cannot run this test + log.info("Skipping test 'testS3Kind()' because the S3 file system is not in the class path"); + return; + } + + final FileSystem s3 = new Path("s3://myId:mySecret@bucket/some/bucket/some/object").getFileSystem(); + assertEquals(FileSystemKind.OBJECT_STORE, s3.getKind()); + } + + @Test + public void testS3nKind() throws IOException { + try { + Class.forName("org.apache.hadoop.fs.s3native.NativeS3FileSystem"); + } catch (ClassNotFoundException ignored) { + // not in the classpath, cannot run this test + log.info("Skipping test 'testS3nKind()' because the Native S3 file system is not in the class path"); + return; + } + + final FileSystem s3n = new Path("s3n://myId:mySecret@bucket/some/bucket/some/object").getFileSystem(); + assertEquals(FileSystemKind.OBJECT_STORE, s3n.getKind()); + } + + @Test + public void testS3aKind() throws IOException { + try { + Class.forName("org.apache.hadoop.fs.s3a.S3AFileSystem"); + } catch (ClassNotFoundException ignored) { + // not in the classpath, cannot run this test + log.info("Skipping test 'testS3aKind()' because the S3AFileSystem is not in the class path"); + return; + } + + final FileSystem s3a = new Path("s3a://myId:mySecret@bucket/some/bucket/some/object").getFileSystem(); + assertEquals(FileSystemKind.OBJECT_STORE, s3a.getKind()); + } + + @Test + public void testS3fileSystemSchemes() { + assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("s3")); + assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("s3n")); + assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("s3a")); + assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("EMRFS")); + } + + @Test + public void testViewFs() { + assertEquals(FileSystemKind.FILE_SYSTEM, HadoopFileSystem.getKindForScheme("viewfs")); + } +}