Repository: incubator-beam
Updated Branches:
  refs/heads/master 4f97efc11 -> 28d7913be


[BEAM-59] initial interfaces and classes of Beam FileSystem.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/467f7d17
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/467f7d17
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/467f7d17

Branch: refs/heads/master
Commit: 467f7d17c4c96bc57b0160c2d4768ceb303bc561
Parents: 4f97efc
Author: Pei He <pe...@google.com>
Authored: Wed Dec 7 17:35:23 2016 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Dec 19 15:20:37 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/FileSystem.java |  29 ++++
 .../apache/beam/sdk/io/FileSystemRegistrar.java |  49 ++++++
 .../org/apache/beam/sdk/io/FileSystems.java     | 155 +++++++++++++++++++
 .../org/apache/beam/sdk/io/LocalFileSystem.java |  27 ++++
 .../beam/sdk/io/LocalFileSystemRegistrar.java   |  41 +++++
 .../org/apache/beam/sdk/io/FileSystemsTest.java | 104 +++++++++++++
 .../sdk/io/LocalFileSystemRegistrarTest.java    |  44 ++++++
 sdks/java/io/google-cloud-platform/pom.xml      |   6 +
 .../beam/sdk/io/gcp/storage/GcsFileSystem.java  |  34 ++++
 .../io/gcp/storage/GcsFileSystemRegistrar.java  |  42 +++++
 .../beam/sdk/io/gcp/storage/package-info.java   |  21 +++
 .../gcp/storage/GcsFileSystemRegistrarTest.java |  51 ++++++
 sdks/java/io/hdfs/pom.xml                       |   6 +
 .../beam/sdk/io/hdfs/HadoopFileSystem.java      |  29 ++++
 .../sdk/io/hdfs/HadoopFileSystemRegistrar.java  |  42 +++++
 .../io/hdfs/HadoopFileSystemRegistrarTest.java  |  52 +++++++
 16 files changed, 732 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
new file mode 100644
index 0000000..d990403
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+/**
+ * File system interface in Beam.
+ *
+ * <p>It defines APIs for writing file systems agnostic code.
+ *
+ * <p>All methods are protected, and they are for file system providers to 
implement.
+ * Clients should use {@link FileSystems} utility.
+ */
+public abstract class FileSystem {
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java
new file mode 100644
index 0000000..1d81c1e
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import com.google.auto.service.AutoService;
+import java.util.ServiceLoader;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * A registrar that creates {@link FileSystem} instances from {@link 
PipelineOptions}.
+ *
+ * <p>{@link FileSystem} creators have the ability to provide a registrar by 
creating
+ * a {@link ServiceLoader} entry and a concrete implementation of this 
interface.
+ *
+ * <p>It is optional but recommended to use one of the many build time tools 
such as
+ * {@link AutoService} to generate the necessary META-INF files automatically.
+ */
+public interface FileSystemRegistrar {
+  /**
+   * Create a {@link FileSystem} from the given {@link PipelineOptions}.
+   */
+  FileSystem fromOptions(@Nullable PipelineOptions options);
+
+  /**
+   * Get the URI scheme which defines the namespace of the {@link 
FileSystemRegistrar}.
+   *
+   * <p>The scheme is required to be unique among all
+   * {@link FileSystemRegistrar FileSystemRegistrars}.
+   *
+   * @see <a href="https://www.ietf.org/rfc/rfc2396.txt";>RFC 2396</a>
+   */
+  String getScheme();
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
new file mode 100644
index 0000000..d086ec6
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeMultimap;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Pattern;
+import javax.annotation.Nonnull;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
+
+/**
+ * Clients facing {@link FileSystem} utility.
+ */
+public class FileSystems {
+
+  public static final String DEFAULT_SCHEME = "default";
+
+  private static final Pattern URI_SCHEME_PATTERN = 
Pattern.compile("^[a-zA-Z][-a-zA-Z0-9+.]*$");
+
+  private static final Map<String, FileSystemRegistrar> SCHEME_TO_REGISTRAR =
+      new ConcurrentHashMap<>();
+
+  private static final Map<String, PipelineOptions> SCHEME_TO_DEFAULT_CONFIG =
+      new ConcurrentHashMap<>();
+
+  static {
+    loadFileSystemRegistrars();
+  }
+
+  /**
+   * Loads available {@link FileSystemRegistrar} services.
+   */
+  private static void loadFileSystemRegistrars() {
+    SCHEME_TO_REGISTRAR.clear();
+    Set<FileSystemRegistrar> registrars =
+        Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
+    registrars.addAll(Lists.newArrayList(
+        ServiceLoader.load(FileSystemRegistrar.class, 
ReflectHelpers.findClassLoader())));
+
+    verifySchemesAreUnique(registrars);
+
+    for (FileSystemRegistrar registrar : registrars) {
+      SCHEME_TO_REGISTRAR.put(registrar.getScheme().toLowerCase(), registrar);
+    }
+  }
+
+  /**
+   * Sets the default configuration to be used with a {@link 
FileSystemRegistrar} for the provided
+   * {@code scheme}.
+   *
+   * <p>Syntax: <pre>scheme = alpha *( alpha | digit | "+" | "-" | "." )</pre>
+   * Upper case letters are treated as the same as lower case letters.
+   */
+  public static void setDefaultConfig(String scheme, PipelineOptions options) {
+    String lowerCaseScheme = checkNotNull(scheme, "scheme").toLowerCase();
+    checkArgument(
+        URI_SCHEME_PATTERN.matcher(lowerCaseScheme).matches(),
+        String.format("Scheme: [%s] doesn't match URI syntax: %s",
+            lowerCaseScheme, URI_SCHEME_PATTERN.pattern()));
+    checkArgument(
+        SCHEME_TO_REGISTRAR.containsKey(lowerCaseScheme),
+        String.format("No FileSystemRegistrar found for scheme: [%s].", 
lowerCaseScheme));
+    SCHEME_TO_DEFAULT_CONFIG.put(lowerCaseScheme, checkNotNull(options, 
"options"));
+  }
+
+  @VisibleForTesting
+  static PipelineOptions getDefaultConfig(String scheme) {
+    return SCHEME_TO_DEFAULT_CONFIG.get(scheme.toLowerCase());
+  }
+
+  /**
+   * Internal method to get {@link FileSystem} for {@code spec}.
+   */
+  @VisibleForTesting
+  static FileSystem getFileSystemInternal(URI uri) {
+    String lowerCaseScheme = (uri.getScheme() != null
+        ? uri.getScheme().toLowerCase() : 
LocalFileSystemRegistrar.LOCAL_FILE_SCHEME);
+    return 
getRegistrarInternal(lowerCaseScheme).fromOptions(getDefaultConfig(lowerCaseScheme));
+  }
+
+  /**
+   * Internal method to get {@link FileSystemRegistrar} for {@code scheme}.
+   */
+  @VisibleForTesting
+  static FileSystemRegistrar getRegistrarInternal(String scheme) {
+    String lowerCaseScheme = scheme.toLowerCase();
+    if (SCHEME_TO_REGISTRAR.containsKey(lowerCaseScheme)) {
+      return SCHEME_TO_REGISTRAR.get(lowerCaseScheme);
+    } else if (SCHEME_TO_REGISTRAR.containsKey(DEFAULT_SCHEME)) {
+      return SCHEME_TO_REGISTRAR.get(DEFAULT_SCHEME);
+    } else {
+      throw new IllegalStateException("Unable to find registrar for " + 
scheme);
+    }
+  }
+
+  @VisibleForTesting
+  static void verifySchemesAreUnique(Set<FileSystemRegistrar> registrars) {
+    Multimap<String, FileSystemRegistrar> registrarsBySchemes =
+        TreeMultimap.create(Ordering.<String>natural(), Ordering.arbitrary());
+
+    for (FileSystemRegistrar registrar : registrars) {
+      registrarsBySchemes.put(registrar.getScheme().toLowerCase(), registrar);
+    }
+    for (Entry<String, Collection<FileSystemRegistrar>> entry
+        : registrarsBySchemes.asMap().entrySet()) {
+      if (entry.getValue().size() > 1) {
+        String conflictingRegistrars = Joiner.on(", ").join(
+            FluentIterable.from(entry.getValue())
+                .transform(new Function<FileSystemRegistrar, String>() {
+                  @Override
+                  public String apply(@Nonnull FileSystemRegistrar input) {
+                    return input.getClass().getName();
+                  }})
+                .toSortedList(Ordering.<String>natural()));
+        throw new IllegalStateException(String.format(
+            "Scheme: [%s] has conflicting registrars: [%s]",
+            entry.getKey(),
+            conflictingRegistrars));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
new file mode 100644
index 0000000..23c2a92
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+/**
+ * {@link FileSystem} implementation for local files.
+ */
+class LocalFileSystem extends FileSystem {
+
+  LocalFileSystem() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java
new file mode 100644
index 0000000..75a38e8
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import com.google.auto.service.AutoService;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * {@link AutoService} registrar for the {@link FileSystem}.
+ */
+@AutoService(FileSystemRegistrar.class)
+public class LocalFileSystemRegistrar implements FileSystemRegistrar {
+
+  static final String LOCAL_FILE_SCHEME = "file";
+
+  @Override
+  public FileSystem fromOptions(@Nullable PipelineOptions options) {
+    return new LocalFileSystem();
+  }
+
+  @Override
+  public String getScheme() {
+    return LOCAL_FILE_SCHEME;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java
new file mode 100644
index 0000000..9b41b98
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Sets;
+import java.net.URI;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link FileSystems}.
+ */
+@RunWith(JUnit4.class)
+public class FileSystemsTest {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testSetDefaultConfig() throws Exception {
+    PipelineOptions first = PipelineOptionsFactory.create();
+    PipelineOptions second = PipelineOptionsFactory.create();
+    FileSystems.setDefaultConfig("file", first);
+    assertEquals(first, FileSystems.getDefaultConfig("file"));
+    assertEquals(first, FileSystems.getDefaultConfig("FILE"));
+
+    FileSystems.setDefaultConfig("FILE", second);
+    assertNotEquals(first, FileSystems.getDefaultConfig("file"));
+    assertNotEquals(first, FileSystems.getDefaultConfig("FILE"));
+    assertEquals(second, FileSystems.getDefaultConfig("file"));
+    assertEquals(second, FileSystems.getDefaultConfig("FILE"));
+  }
+
+  @Test
+  public void testSetDefaultConfigNotFound() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("No FileSystemRegistrar found for scheme: [gs-s3].");
+    FileSystems.setDefaultConfig("gs-s3", PipelineOptionsFactory.create());
+  }
+
+  @Test
+  public void testSetDefaultConfigInvalidScheme() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Scheme: [gs:] doesn't match URI syntax");
+    FileSystems.setDefaultConfig("gs:", PipelineOptionsFactory.create());
+  }
+
+  @Test
+  public void testGetLocalFileSystem() throws Exception {
+    assertTrue(
+        FileSystems.getFileSystemInternal(URI.create("~/home/")) instanceof 
LocalFileSystem);
+    assertTrue(
+        FileSystems.getFileSystemInternal(URI.create("file://home")) 
instanceof LocalFileSystem);
+    assertTrue(
+        FileSystems.getFileSystemInternal(URI.create("FILE://home")) 
instanceof LocalFileSystem);
+    assertTrue(
+        FileSystems.getFileSystemInternal(URI.create("File://home")) 
instanceof LocalFileSystem);
+  }
+
+  @Test
+  public void testVerifySchemesAreUnique() throws Exception {
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("Scheme: [file] has conflicting registrars");
+    FileSystems.verifySchemesAreUnique(
+        Sets.<FileSystemRegistrar>newHashSet(
+            new LocalFileSystemRegistrar(),
+            new FileSystemRegistrar() {
+              @Override
+              public FileSystem fromOptions(@Nullable PipelineOptions options) 
{
+                return null;
+              }
+
+              @Override
+              public String getScheme() {
+                return "FILE";
+              }
+            }));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemRegistrarTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemRegistrarTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemRegistrarTest.java
new file mode 100644
index 0000000..e4e8326
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemRegistrarTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+import java.util.ServiceLoader;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link LocalFileSystemRegistrar}.
+ */
+@RunWith(JUnit4.class)
+public class LocalFileSystemRegistrarTest {
+
+  @Test
+  public void testServiceLoader() {
+    for (FileSystemRegistrar registrar
+        : 
Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) {
+      if (registrar instanceof LocalFileSystemRegistrar) {
+        return;
+      }
+    }
+    fail("Expected to find " + LocalFileSystemRegistrar.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml 
b/sdks/java/io/google-cloud-platform/pom.xml
index d3b5fed..76bdc45 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -100,6 +100,12 @@
     </dependency>
 
     <dependency>
+      <groupId>com.google.auto.service</groupId>
+      <artifactId>auto-service</artifactId>
+      <optional>true</optional>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.cloud.bigdataoss</groupId>
       <artifactId>util</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
new file mode 100644
index 0000000..4b03e27
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.storage;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.options.GcsOptions;
+
+/**
+ * {@link FileSystem} implementation for Google Cloud Storage.
+ */
+class GcsFileSystem extends FileSystem {
+  private final GcsOptions options;
+
+  GcsFileSystem(GcsOptions options) {
+    this.options = checkNotNull(options, "options");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java
new file mode 100644
index 0000000..10452a1
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.storage;
+
+import com.google.auto.service.AutoService;
+import javax.annotation.Nonnull;
+import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.FileSystemRegistrar;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * {@link AutoService} registrar for the {@link GcsFileSystem}.
+ */
+@AutoService(FileSystemRegistrar.class)
+public class GcsFileSystemRegistrar implements FileSystemRegistrar {
+
+  @Override
+  public FileSystem fromOptions(@Nonnull PipelineOptions options) {
+    return new GcsFileSystem(options.as(GcsOptions.class));
+  }
+
+  @Override
+  public String getScheme() {
+    return "gs";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/package-info.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/package-info.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/package-info.java
new file mode 100644
index 0000000..b5378be
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines IO connectors for Google Cloud Storage.
+ */
+package org.apache.beam.sdk.io.gcp.storage;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java
new file mode 100644
index 0000000..ecac8f6
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.storage;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+import java.util.ServiceLoader;
+
+import org.apache.beam.sdk.io.FileSystemRegistrar;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link GcsFileSystemRegistrar}.
+ */
+@RunWith(JUnit4.class)
+public class GcsFileSystemRegistrarTest {
+
+  @Test
+  public void testServiceLoader() {
+    for (FileSystemRegistrar registrar
+        : 
Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) {
+      if (registrar instanceof GcsFileSystemRegistrar) {
+        assertEquals("gs", registrar.getScheme());
+        assertTrue(registrar.fromOptions(PipelineOptionsFactory.create()) 
instanceof GcsFileSystem);
+        return;
+      }
+    }
+    fail("Expected to find " + GcsFileSystemRegistrar.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/io/hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml
index 772276b..b171cfe 100644
--- a/sdks/java/io/hdfs/pom.xml
+++ b/sdks/java/io/hdfs/pom.xml
@@ -64,6 +64,12 @@
     </dependency>
 
     <dependency>
+      <groupId>com.google.auto.service</groupId>
+      <artifactId>auto-service</artifactId>
+      <optional>true</optional>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
new file mode 100644
index 0000000..b94a089
--- /dev/null
+++ 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import org.apache.beam.sdk.io.FileSystem;
+
+/**
+ * Adapts {@link org.apache.hadoop.fs.FileSystem} connectors to be used as
+ * Apache Beam {@link FileSystem FileSystems}.
+ */
+class HadoopFileSystem extends FileSystem {
+
+  HadoopFileSystem() {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
new file mode 100644
index 0000000..1471cb0
--- /dev/null
+++ 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import com.google.auto.service.AutoService;
+import javax.annotation.Nonnull;
+import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.FileSystemRegistrar;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * {@link AutoService} registrar for the {@link HadoopFileSystem}.
+ */
+@AutoService(FileSystemRegistrar.class)
+public class HadoopFileSystemRegistrar implements FileSystemRegistrar {
+
+  @Override
+  public FileSystem fromOptions(@Nonnull PipelineOptions options) {
+    return new HadoopFileSystem();
+  }
+
+  @Override
+  public String getScheme() {
+    return FileSystems.DEFAULT_SCHEME;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java
 
b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java
new file mode 100644
index 0000000..22a439a
--- /dev/null
+++ 
b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+import java.util.ServiceLoader;
+import org.apache.beam.sdk.io.FileSystemRegistrar;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link HadoopFileSystemRegistrar}.
+ */
+@RunWith(JUnit4.class)
+public class HadoopFileSystemRegistrarTest {
+
+  @Test
+  public void testServiceLoader() {
+    for (FileSystemRegistrar registrar
+        : 
Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) {
+      if (registrar instanceof HadoopFileSystemRegistrar) {
+        assertEquals(FileSystems.DEFAULT_SCHEME, registrar.getScheme());
+        assertTrue(
+            registrar.fromOptions(PipelineOptionsFactory.create()) instanceof 
HadoopFileSystem);
+        return;
+      }
+    }
+    fail("Expected to find " + HadoopFileSystemRegistrar.class);
+  }
+}


Reply via email to