Repository: incubator-beam Updated Branches: refs/heads/master 4f97efc11 -> 28d7913be
[BEAM-59] initial interfaces and classes of Beam FileSystem. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/467f7d17 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/467f7d17 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/467f7d17 Branch: refs/heads/master Commit: 467f7d17c4c96bc57b0160c2d4768ceb303bc561 Parents: 4f97efc Author: Pei He <pe...@google.com> Authored: Wed Dec 7 17:35:23 2016 -0800 Committer: Luke Cwik <lc...@google.com> Committed: Mon Dec 19 15:20:37 2016 -0800 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/FileSystem.java | 29 ++++ .../apache/beam/sdk/io/FileSystemRegistrar.java | 49 ++++++ .../org/apache/beam/sdk/io/FileSystems.java | 155 +++++++++++++++++++ .../org/apache/beam/sdk/io/LocalFileSystem.java | 27 ++++ .../beam/sdk/io/LocalFileSystemRegistrar.java | 41 +++++ .../org/apache/beam/sdk/io/FileSystemsTest.java | 104 +++++++++++++ .../sdk/io/LocalFileSystemRegistrarTest.java | 44 ++++++ sdks/java/io/google-cloud-platform/pom.xml | 6 + .../beam/sdk/io/gcp/storage/GcsFileSystem.java | 34 ++++ .../io/gcp/storage/GcsFileSystemRegistrar.java | 42 +++++ .../beam/sdk/io/gcp/storage/package-info.java | 21 +++ .../gcp/storage/GcsFileSystemRegistrarTest.java | 51 ++++++ sdks/java/io/hdfs/pom.xml | 6 + .../beam/sdk/io/hdfs/HadoopFileSystem.java | 29 ++++ .../sdk/io/hdfs/HadoopFileSystemRegistrar.java | 42 +++++ .../io/hdfs/HadoopFileSystemRegistrarTest.java | 52 +++++++ 16 files changed, 732 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java new file mode 100644 index 0000000..d990403 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +/** + * File system interface in Beam. + * + * <p>It defines APIs for writing file systems agnostic code. + * + * <p>All methods are protected, and they are for file system providers to implement. + * Clients should use {@link FileSystems} utility. + */ +public abstract class FileSystem { +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java new file mode 100644 index 0000000..1d81c1e --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import com.google.auto.service.AutoService; +import java.util.ServiceLoader; +import javax.annotation.Nullable; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * A registrar that creates {@link FileSystem} instances from {@link PipelineOptions}. + * + * <p>{@link FileSystem} creators have the ability to provide a registrar by creating + * a {@link ServiceLoader} entry and a concrete implementation of this interface. + * + * <p>It is optional but recommended to use one of the many build time tools such as + * {@link AutoService} to generate the necessary META-INF files automatically. + */ +public interface FileSystemRegistrar { + /** + * Create a {@link FileSystem} from the given {@link PipelineOptions}. + */ + FileSystem fromOptions(@Nullable PipelineOptions options); + + /** + * Get the URI scheme which defines the namespace of the {@link FileSystemRegistrar}. + * + * <p>The scheme is required to be unique among all + * {@link FileSystemRegistrar FileSystemRegistrars}. + * + * @see <a href="https://www.ietf.org/rfc/rfc2396.txt">RFC 2396</a> + */ + String getScheme(); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java new file mode 100644 index 0000000..d086ec6 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; +import com.google.common.collect.Ordering; +import com.google.common.collect.Sets; +import com.google.common.collect.TreeMultimap; +import java.net.URI; +import java.util.Collection; +import java.util.Map; +import java.util.Map.Entry; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Pattern; +import javax.annotation.Nonnull; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.common.ReflectHelpers; + +/** + * Clients facing {@link FileSystem} utility. + */ +public class FileSystems { + + public static final String DEFAULT_SCHEME = "default"; + + private static final Pattern URI_SCHEME_PATTERN = Pattern.compile("^[a-zA-Z][-a-zA-Z0-9+.]*$"); + + private static final Map<String, FileSystemRegistrar> SCHEME_TO_REGISTRAR = + new ConcurrentHashMap<>(); + + private static final Map<String, PipelineOptions> SCHEME_TO_DEFAULT_CONFIG = + new ConcurrentHashMap<>(); + + static { + loadFileSystemRegistrars(); + } + + /** + * Loads available {@link FileSystemRegistrar} services. + */ + private static void loadFileSystemRegistrars() { + SCHEME_TO_REGISTRAR.clear(); + Set<FileSystemRegistrar> registrars = + Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE); + registrars.addAll(Lists.newArrayList( + ServiceLoader.load(FileSystemRegistrar.class, ReflectHelpers.findClassLoader()))); + + verifySchemesAreUnique(registrars); + + for (FileSystemRegistrar registrar : registrars) { + SCHEME_TO_REGISTRAR.put(registrar.getScheme().toLowerCase(), registrar); + } + } + + /** + * Sets the default configuration to be used with a {@link FileSystemRegistrar} for the provided + * {@code scheme}. + * + * <p>Syntax: <pre>scheme = alpha *( alpha | digit | "+" | "-" | "." )</pre> + * Upper case letters are treated as the same as lower case letters. + */ + public static void setDefaultConfig(String scheme, PipelineOptions options) { + String lowerCaseScheme = checkNotNull(scheme, "scheme").toLowerCase(); + checkArgument( + URI_SCHEME_PATTERN.matcher(lowerCaseScheme).matches(), + String.format("Scheme: [%s] doesn't match URI syntax: %s", + lowerCaseScheme, URI_SCHEME_PATTERN.pattern())); + checkArgument( + SCHEME_TO_REGISTRAR.containsKey(lowerCaseScheme), + String.format("No FileSystemRegistrar found for scheme: [%s].", lowerCaseScheme)); + SCHEME_TO_DEFAULT_CONFIG.put(lowerCaseScheme, checkNotNull(options, "options")); + } + + @VisibleForTesting + static PipelineOptions getDefaultConfig(String scheme) { + return SCHEME_TO_DEFAULT_CONFIG.get(scheme.toLowerCase()); + } + + /** + * Internal method to get {@link FileSystem} for {@code spec}. + */ + @VisibleForTesting + static FileSystem getFileSystemInternal(URI uri) { + String lowerCaseScheme = (uri.getScheme() != null + ? uri.getScheme().toLowerCase() : LocalFileSystemRegistrar.LOCAL_FILE_SCHEME); + return getRegistrarInternal(lowerCaseScheme).fromOptions(getDefaultConfig(lowerCaseScheme)); + } + + /** + * Internal method to get {@link FileSystemRegistrar} for {@code scheme}. + */ + @VisibleForTesting + static FileSystemRegistrar getRegistrarInternal(String scheme) { + String lowerCaseScheme = scheme.toLowerCase(); + if (SCHEME_TO_REGISTRAR.containsKey(lowerCaseScheme)) { + return SCHEME_TO_REGISTRAR.get(lowerCaseScheme); + } else if (SCHEME_TO_REGISTRAR.containsKey(DEFAULT_SCHEME)) { + return SCHEME_TO_REGISTRAR.get(DEFAULT_SCHEME); + } else { + throw new IllegalStateException("Unable to find registrar for " + scheme); + } + } + + @VisibleForTesting + static void verifySchemesAreUnique(Set<FileSystemRegistrar> registrars) { + Multimap<String, FileSystemRegistrar> registrarsBySchemes = + TreeMultimap.create(Ordering.<String>natural(), Ordering.arbitrary()); + + for (FileSystemRegistrar registrar : registrars) { + registrarsBySchemes.put(registrar.getScheme().toLowerCase(), registrar); + } + for (Entry<String, Collection<FileSystemRegistrar>> entry + : registrarsBySchemes.asMap().entrySet()) { + if (entry.getValue().size() > 1) { + String conflictingRegistrars = Joiner.on(", ").join( + FluentIterable.from(entry.getValue()) + .transform(new Function<FileSystemRegistrar, String>() { + @Override + public String apply(@Nonnull FileSystemRegistrar input) { + return input.getClass().getName(); + }}) + .toSortedList(Ordering.<String>natural())); + throw new IllegalStateException(String.format( + "Scheme: [%s] has conflicting registrars: [%s]", + entry.getKey(), + conflictingRegistrars)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java new file mode 100644 index 0000000..23c2a92 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +/** + * {@link FileSystem} implementation for local files. + */ +class LocalFileSystem extends FileSystem { + + LocalFileSystem() { + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java new file mode 100644 index 0000000..75a38e8 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import com.google.auto.service.AutoService; +import javax.annotation.Nullable; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * {@link AutoService} registrar for the {@link FileSystem}. + */ +@AutoService(FileSystemRegistrar.class) +public class LocalFileSystemRegistrar implements FileSystemRegistrar { + + static final String LOCAL_FILE_SCHEME = "file"; + + @Override + public FileSystem fromOptions(@Nullable PipelineOptions options) { + return new LocalFileSystem(); + } + + @Override + public String getScheme() { + return LOCAL_FILE_SCHEME; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java new file mode 100644 index 0000000..9b41b98 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import com.google.common.collect.Sets; +import java.net.URI; +import javax.annotation.Nullable; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link FileSystems}. + */ +@RunWith(JUnit4.class) +public class FileSystemsTest { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testSetDefaultConfig() throws Exception { + PipelineOptions first = PipelineOptionsFactory.create(); + PipelineOptions second = PipelineOptionsFactory.create(); + FileSystems.setDefaultConfig("file", first); + assertEquals(first, FileSystems.getDefaultConfig("file")); + assertEquals(first, FileSystems.getDefaultConfig("FILE")); + + FileSystems.setDefaultConfig("FILE", second); + assertNotEquals(first, FileSystems.getDefaultConfig("file")); + assertNotEquals(first, FileSystems.getDefaultConfig("FILE")); + assertEquals(second, FileSystems.getDefaultConfig("file")); + assertEquals(second, FileSystems.getDefaultConfig("FILE")); + } + + @Test + public void testSetDefaultConfigNotFound() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("No FileSystemRegistrar found for scheme: [gs-s3]."); + FileSystems.setDefaultConfig("gs-s3", PipelineOptionsFactory.create()); + } + + @Test + public void testSetDefaultConfigInvalidScheme() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Scheme: [gs:] doesn't match URI syntax"); + FileSystems.setDefaultConfig("gs:", PipelineOptionsFactory.create()); + } + + @Test + public void testGetLocalFileSystem() throws Exception { + assertTrue( + FileSystems.getFileSystemInternal(URI.create("~/home/")) instanceof LocalFileSystem); + assertTrue( + FileSystems.getFileSystemInternal(URI.create("file://home")) instanceof LocalFileSystem); + assertTrue( + FileSystems.getFileSystemInternal(URI.create("FILE://home")) instanceof LocalFileSystem); + assertTrue( + FileSystems.getFileSystemInternal(URI.create("File://home")) instanceof LocalFileSystem); + } + + @Test + public void testVerifySchemesAreUnique() throws Exception { + thrown.expect(RuntimeException.class); + thrown.expectMessage("Scheme: [file] has conflicting registrars"); + FileSystems.verifySchemesAreUnique( + Sets.<FileSystemRegistrar>newHashSet( + new LocalFileSystemRegistrar(), + new FileSystemRegistrar() { + @Override + public FileSystem fromOptions(@Nullable PipelineOptions options) { + return null; + } + + @Override + public String getScheme() { + return "FILE"; + } + })); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemRegistrarTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemRegistrarTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemRegistrarTest.java new file mode 100644 index 0000000..e4e8326 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemRegistrarTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import static org.junit.Assert.fail; + +import com.google.common.collect.Lists; +import java.util.ServiceLoader; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link LocalFileSystemRegistrar}. + */ +@RunWith(JUnit4.class) +public class LocalFileSystemRegistrarTest { + + @Test + public void testServiceLoader() { + for (FileSystemRegistrar registrar + : Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) { + if (registrar instanceof LocalFileSystemRegistrar) { + return; + } + } + fail("Expected to find " + LocalFileSystemRegistrar.class); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/io/google-cloud-platform/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml index d3b5fed..76bdc45 100644 --- a/sdks/java/io/google-cloud-platform/pom.xml +++ b/sdks/java/io/google-cloud-platform/pom.xml @@ -100,6 +100,12 @@ </dependency> <dependency> + <groupId>com.google.auto.service</groupId> + <artifactId>auto-service</artifactId> + <optional>true</optional> + </dependency> + + <dependency> <groupId>com.google.cloud.bigdataoss</groupId> <artifactId>util</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java new file mode 100644 index 0000000..4b03e27 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.storage; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.beam.sdk.io.FileSystem; +import org.apache.beam.sdk.options.GcsOptions; + +/** + * {@link FileSystem} implementation for Google Cloud Storage. + */ +class GcsFileSystem extends FileSystem { + private final GcsOptions options; + + GcsFileSystem(GcsOptions options) { + this.options = checkNotNull(options, "options"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java new file mode 100644 index 0000000..10452a1 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.storage; + +import com.google.auto.service.AutoService; +import javax.annotation.Nonnull; +import org.apache.beam.sdk.io.FileSystem; +import org.apache.beam.sdk.io.FileSystemRegistrar; +import org.apache.beam.sdk.options.GcsOptions; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * {@link AutoService} registrar for the {@link GcsFileSystem}. + */ +@AutoService(FileSystemRegistrar.class) +public class GcsFileSystemRegistrar implements FileSystemRegistrar { + + @Override + public FileSystem fromOptions(@Nonnull PipelineOptions options) { + return new GcsFileSystem(options.as(GcsOptions.class)); + } + + @Override + public String getScheme() { + return "gs"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/package-info.java new file mode 100644 index 0000000..b5378be --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Defines IO connectors for Google Cloud Storage. + */ +package org.apache.beam.sdk.io.gcp.storage; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java new file mode 100644 index 0000000..ecac8f6 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.storage; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import com.google.common.collect.Lists; +import java.util.ServiceLoader; + +import org.apache.beam.sdk.io.FileSystemRegistrar; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link GcsFileSystemRegistrar}. + */ +@RunWith(JUnit4.class) +public class GcsFileSystemRegistrarTest { + + @Test + public void testServiceLoader() { + for (FileSystemRegistrar registrar + : Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) { + if (registrar instanceof GcsFileSystemRegistrar) { + assertEquals("gs", registrar.getScheme()); + assertTrue(registrar.fromOptions(PipelineOptionsFactory.create()) instanceof GcsFileSystem); + return; + } + } + fail("Expected to find " + GcsFileSystemRegistrar.class); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/io/hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml index 772276b..b171cfe 100644 --- a/sdks/java/io/hdfs/pom.xml +++ b/sdks/java/io/hdfs/pom.xml @@ -64,6 +64,12 @@ </dependency> <dependency> + <groupId>com.google.auto.service</groupId> + <artifactId>auto-service</artifactId> + <optional>true</optional> + </dependency> + + <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java new file mode 100644 index 0000000..b94a089 --- /dev/null +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import org.apache.beam.sdk.io.FileSystem; + +/** + * Adapts {@link org.apache.hadoop.fs.FileSystem} connectors to be used as + * Apache Beam {@link FileSystem FileSystems}. + */ +class HadoopFileSystem extends FileSystem { + + HadoopFileSystem() {} +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java new file mode 100644 index 0000000..1471cb0 --- /dev/null +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import com.google.auto.service.AutoService; +import javax.annotation.Nonnull; +import org.apache.beam.sdk.io.FileSystem; +import org.apache.beam.sdk.io.FileSystemRegistrar; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * {@link AutoService} registrar for the {@link HadoopFileSystem}. + */ +@AutoService(FileSystemRegistrar.class) +public class HadoopFileSystemRegistrar implements FileSystemRegistrar { + + @Override + public FileSystem fromOptions(@Nonnull PipelineOptions options) { + return new HadoopFileSystem(); + } + + @Override + public String getScheme() { + return FileSystems.DEFAULT_SCHEME; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java new file mode 100644 index 0000000..22a439a --- /dev/null +++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import com.google.common.collect.Lists; +import java.util.ServiceLoader; +import org.apache.beam.sdk.io.FileSystemRegistrar; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link HadoopFileSystemRegistrar}. + */ +@RunWith(JUnit4.class) +public class HadoopFileSystemRegistrarTest { + + @Test + public void testServiceLoader() { + for (FileSystemRegistrar registrar + : Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) { + if (registrar instanceof HadoopFileSystemRegistrar) { + assertEquals(FileSystems.DEFAULT_SCHEME, registrar.getScheme()); + assertTrue( + registrar.fromOptions(PipelineOptionsFactory.create()) instanceof HadoopFileSystem); + return; + } + } + fail("Expected to find " + HadoopFileSystemRegistrar.class); + } +}