Repository: beam Updated Branches: refs/heads/release-2.0.0 c807fec9b -> 7b598f878
[BEAM-2277] Cherrypick #3121 to release-2.0.0 fbb0de129d Remove '/' entirely from determining FileSystem scheme a6a5ff7be3 [BEAM-2277] Add ResourceIdTester and test existing ResourceId implementations ec956c85ef Mark FileSystem and related as Experimental 15df211c75 [BEAM-2277] HadoopFileSystem: normalize implementation f3540d47f1 Rename FileSystems.setDefaultConfigInWorkers 3921163829 Fix shading of guava testlib Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e6ee1d8b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e6ee1d8b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e6ee1d8b Branch: refs/heads/release-2.0.0 Commit: e6ee1d8b98bcee6ee8c6a80b4af6646990e0c399 Parents: c807fec Author: Dan Halperin <[email protected]> Authored: Fri May 12 14:59:14 2017 -0700 Committer: Dan Halperin <[email protected]> Committed: Fri May 12 15:03:50 2017 -0700 ---------------------------------------------------------------------- pom.xml | 4 + .../utils/SerializablePipelineOptions.java | 2 +- runners/direct-java/pom.xml | 4 + .../utils/SerializedPipelineOptions.java | 2 +- .../DataflowPipelineTranslatorTest.java | 2 +- .../runners/dataflow/DataflowRunnerTest.java | 6 +- .../options/DataflowPipelineOptionsTest.java | 4 +- .../runners/dataflow/util/PackageUtilTest.java | 2 +- .../spark/translation/SparkRuntimeContext.java | 2 +- sdks/java/core/pom.xml | 4 + .../org/apache/beam/sdk/PipelineRunner.java | 2 +- .../beam/sdk/annotations/Experimental.java | 7 + .../java/org/apache/beam/sdk/io/AvroIO.java | 4 + .../org/apache/beam/sdk/io/FileBasedSink.java | 21 ++- .../java/org/apache/beam/sdk/io/FileSystem.java | 3 + .../apache/beam/sdk/io/FileSystemRegistrar.java | 3 + .../org/apache/beam/sdk/io/FileSystems.java | 21 ++- .../beam/sdk/io/LocalFileSystemRegistrar.java | 3 + .../org/apache/beam/sdk/io/LocalResources.java | 3 + .../java/org/apache/beam/sdk/io/TFRecordIO.java | 4 + .../java/org/apache/beam/sdk/io/TextIO.java | 4 + .../org/apache/beam/sdk/io/fs/ResourceId.java | 3 + .../apache/beam/sdk/testing/TestPipeline.java | 2 +- .../apache/beam/sdk/io/LocalResourceIdTest.java | 6 + .../apache/beam/sdk/io/fs/ResourceIdTester.java | 150 +++++++++++++++++++ .../google-cloud-platform-core/pom.xml | 6 + .../gcp/storage/GcsFileSystemRegistrar.java | 5 +- .../gcp/storage/GcsResourceIdTest.java | 9 ++ sdks/java/io/hadoop-file-system/pom.xml | 13 ++ .../beam/sdk/io/hdfs/HadoopFileSystem.java | 32 ++-- .../sdk/io/hdfs/HadoopFileSystemOptions.java | 3 + .../sdk/io/hdfs/HadoopFileSystemRegistrar.java | 3 + .../beam/sdk/io/hdfs/HadoopResourceId.java | 16 +- .../beam/sdk/io/hdfs/HadoopFileSystemTest.java | 5 +- .../beam/sdk/io/hdfs/HadoopResourceIdTest.java | 71 +++++++++ 35 files changed, 395 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 3d02096..8f96acc 100644 --- a/pom.xml +++ b/pom.xml @@ -1446,6 +1446,10 @@ <relocations> <relocation> <pattern>com.google.common</pattern> + <excludes> + <!-- com.google.common is too generic, need to exclude guava-testlib --> + <exclude>com.google.common.**.testing.*</exclude> + </excludes> <!--suppress MavenModelInspection --> <shadedPattern> org.apache.${renderedArtifactId}.repackaged.com.google.common http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java index 02afa7a..46b04fc 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java @@ -62,7 +62,7 @@ public class SerializablePipelineOptions implements Externalizable { .as(ApexPipelineOptions.class); if (FILE_SYSTEMS_INTIIALIZED.compareAndSet(false, true)) { - FileSystems.setDefaultConfigInWorkers(pipelineOptions); + FileSystems.setDefaultPipelineOptions(pipelineOptions); } } http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/runners/direct-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml index c971ce3..95d560c 100644 --- a/runners/direct-java/pom.xml +++ b/runners/direct-java/pom.xml @@ -131,6 +131,10 @@ </relocation> <relocation> <pattern>com.google.common</pattern> + <excludes> + <!-- com.google.common is too generic, need to exclude guava-testlib --> + <exclude>com.google.common.**.testing.*</exclude> + </excludes> <shadedPattern> org.apache.beam.runners.direct.repackaged.com.google.common </shadedPattern> http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java index 84f3bf4..40b6dd6 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java @@ -56,7 +56,7 @@ public class SerializedPipelineOptions implements Serializable { try { pipelineOptions = createMapper().readValue(serializedOptions, PipelineOptions.class); - FileSystems.setDefaultConfigInWorkers(pipelineOptions); + FileSystems.setDefaultPipelineOptions(pipelineOptions); } catch (IOException e) { throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e); } http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 93c1e5b..87744f0 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -144,7 +144,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { Pipeline p = Pipeline.create(options); // Enable the FileSystems API to know about gs:// URIs in this test. - FileSystems.setDefaultConfigInWorkers(options); + FileSystems.setDefaultPipelineOptions(options); p.apply("ReadMyFile", TextIO.read().from("gs://bucket/object")) .apply("WriteMyFile", TextIO.write().to("gs://bucket/object")); http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index ce01aa1..8f10b18 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -177,7 +177,7 @@ public class DataflowRunnerTest { .apply("WriteMyFile", TextIO.write().to("gs://bucket/object")); // Enable the FileSystems API to know about gs:// URIs in this test. - FileSystems.setDefaultConfigInWorkers(options); + FileSystems.setDefaultPipelineOptions(options); return p; } @@ -246,7 +246,7 @@ public class DataflowRunnerTest { options.setGcpCredential(new TestCredential()); // Configure the FileSystem registrar to use these options. - FileSystems.setDefaultConfigInWorkers(options); + FileSystems.setDefaultPipelineOptions(options); return options; } @@ -771,7 +771,7 @@ public class DataflowRunnerTest { @Test public void testInvalidNumberOfWorkerHarnessThreads() throws IOException { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - FileSystems.setDefaultConfigInWorkers(options); + FileSystems.setDefaultPipelineOptions(options); options.setRunner(DataflowRunner.class); options.setProject("foo-12345"); http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java index 613604a..cea44f0 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java @@ -137,7 +137,7 @@ public class DataflowPipelineOptionsTest { @Test public void testDefaultToTempLocation() { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - FileSystems.setDefaultConfigInWorkers(options); + FileSystems.setDefaultPipelineOptions(options); options.setPathValidatorClass(NoopPathValidator.class); options.setTempLocation("gs://temp_location/"); assertEquals("gs://temp_location/", options.getGcpTempLocation()); @@ -147,7 +147,7 @@ public class DataflowPipelineOptionsTest { @Test public void testDefaultToGcpTempLocation() { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - FileSystems.setDefaultConfigInWorkers(options); + FileSystems.setDefaultPipelineOptions(options); options.setPathValidatorClass(NoopPathValidator.class); options.setTempLocation("gs://temp_location/"); options.setGcpTempLocation("gs://gcp_temp_location/"); http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java index c7a660e..5d0c0f2 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java @@ -120,7 +120,7 @@ public class PackageUtilTest { GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class); pipelineOptions.setGcsUtil(mockGcsUtil); - FileSystems.setDefaultConfigInWorkers(pipelineOptions); + FileSystems.setDefaultPipelineOptions(pipelineOptions); createOptions = StandardCreateOptions.builder().setMimeType(MimeTypes.BINARY).build(); } http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java index 82e8b44..f3fe99c 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java @@ -90,7 +90,7 @@ public class SparkRuntimeContext implements Serializable { } } // Register standard FileSystems. - FileSystems.setDefaultConfigInWorkers(pipelineOptions); + FileSystems.setDefaultPipelineOptions(pipelineOptions); } return pipelineOptions; } http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml index 35d160d..712553d 100644 --- a/sdks/java/core/pom.xml +++ b/sdks/java/core/pom.xml @@ -161,6 +161,10 @@ <relocations> <relocation> <pattern>com.google.common</pattern> + <excludes> + <!-- com.google.common is too generic, need to exclude guava-testlib --> + <exclude>com.google.common.**.testing.*</exclude> + </excludes> <!--suppress MavenModelInspection --> <shadedPattern> org.apache.beam.sdk.repackaged.com.google.common http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java index 87705af..c114501 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java @@ -41,7 +41,7 @@ public abstract class PipelineRunner<ResultT extends PipelineResult> { PipelineOptionsValidator.validate(PipelineOptions.class, options); // (Re-)register standard FileSystems. Clobbers any prior credentials. - FileSystems.setDefaultConfigInWorkers(options); + FileSystems.setDefaultPipelineOptions(options); @SuppressWarnings("unchecked") PipelineRunner<? extends PipelineResult> result = http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java index ac02465..8224ebb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java @@ -23,6 +23,7 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; + /** * Signifies that a public API (public class, method or field) is subject to incompatible changes, * or even removal, in a future release. @@ -79,6 +80,12 @@ public @interface Experimental { /** Metrics-related experimental APIs. */ METRICS, + /** + * {@link org.apache.beam.sdk.io.FileSystem} and {@link org.apache.beam.sdk.io.fs.ResourceId} + * related APIs. + */ + FILESYSTEM, + /** Experimental feature related to alternative, unnested encodings for coders. */ CODER_CONTEXT, http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index d13c6ff..6af0e79 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -30,6 +30,8 @@ import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; import org.apache.avro.generic.GenericRecord; import org.apache.avro.reflect.ReflectData; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VoidCoder; @@ -313,6 +315,7 @@ public class AvroIO { * a common suffix (if supplied using {@link #withSuffix(String)}). This default can be * overridden using {@link #withFilenamePolicy(FilenamePolicy)}. */ + @Experimental(Kind.FILESYSTEM) public Write<T> to(ResourceId outputPrefix) { return toResource(StaticValueProvider.of(outputPrefix)); } @@ -333,6 +336,7 @@ public class AvroIO { /** * Like {@link #to(ResourceId)}. */ + @Experimental(Kind.FILESYSTEM) public Write<T> toResource(ValueProvider<ResourceId> outputPrefix) { return toBuilder().setFilenamePrefix(outputPrefix).build(); } http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 7f729a7..8102316 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -47,6 +47,8 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.zip.GZIPOutputStream; import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.NullableCoder; @@ -115,6 +117,7 @@ import org.slf4j.LoggerFactory; * * @param <T> the type of values written to the sink. */ +@Experimental(Kind.FILESYSTEM) public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class); @@ -193,6 +196,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { * {@code /}, {@code gs://my-bucket}, or {@code c://}. In that case, interpreting the string as a * file will fail and this function will return a directory {@link ResourceId} instead. */ + @Experimental(Kind.FILESYSTEM) public static ResourceId convertToFileResourceIfPossible(String outputPrefix) { try { return FileSystems.matchNewResource(outputPrefix, false /* isDirectory */); @@ -290,6 +294,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { * as well as sharding information. The policy must return unique and consistent filenames * for different windows and panes. */ + @Experimental(Kind.FILESYSTEM) public abstract ResourceId windowedFilename( ResourceId outputDirectory, WindowedContext c, String extension); @@ -302,6 +307,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { * <p>The {@link Context} object only provides sharding information, which is used by the policy * to generate unique and consistent filenames. */ + @Experimental(Kind.FILESYSTEM) @Nullable public abstract ResourceId unwindowedFilename( ResourceId outputDirectory, Context c, String extension); @@ -320,6 +326,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { /** * Construct a {@link FileBasedSink} with the given filename policy, producing uncompressed files. */ + @Experimental(Kind.FILESYSTEM) public FileBasedSink( ValueProvider<ResourceId> baseOutputDirectoryProvider, FilenamePolicy filenamePolicy) { this(baseOutputDirectoryProvider, filenamePolicy, CompressionType.UNCOMPRESSED); @@ -335,6 +342,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { /** * Construct a {@link FileBasedSink} with the given filename policy and output channel type. */ + @Experimental(Kind.FILESYSTEM) public FileBasedSink( ValueProvider<ResourceId> baseOutputDirectoryProvider, FilenamePolicy filenamePolicy, @@ -349,6 +357,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { * Returns the base directory inside which files will be written according to the configured * {@link FilenamePolicy}. */ + @Experimental(Kind.FILESYSTEM) public ValueProvider<ResourceId> getBaseOutputDirectoryProvider() { return baseOutputDirectoryProvider; } @@ -358,6 +367,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { * the {@link FilenamePolicy} may itself specify one or more inner directories before each output * file, say when writing windowed outputs in a {@code output/YYYY/MM/DD/file.txt} format. */ + @Experimental(Kind.FILESYSTEM) public final FilenamePolicy getFilenamePolicy() { return filenamePolicy; } @@ -424,9 +434,11 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { protected final ValueProvider<ResourceId> tempDirectory; /** Whether windowed writes are being used. */ - protected boolean windowedWrites; + @Experimental(Kind.FILESYSTEM) + protected boolean windowedWrites; /** Constructs a temporary file resource given the temporary directory and a filename. */ + @Experimental(Kind.FILESYSTEM) protected static ResourceId buildTemporaryFilename(ResourceId tempDirectory, String filename) throws IOException { return tempDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE); @@ -472,6 +484,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { * @param sink the FileBasedSink that will be used to configure this write operation. * @param tempDirectory the base directory to be used for temporary output files. */ + @Experimental(Kind.FILESYSTEM) public WriteOperation(FileBasedSink<T> sink, ResourceId tempDirectory) { this(sink, StaticValueProvider.of(tempDirectory)); } @@ -527,6 +540,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { removeTemporaryFiles(outputFilenames.keySet(), !windowedWrites); } + @Experimental(Kind.FILESYSTEM) protected final Map<ResourceId, ResourceId> buildOutputFilenames( Iterable<FileResult> writerResults) { int numShards = Iterables.size(writerResults); @@ -610,6 +624,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { * @param filenames the filenames of temporary files. */ @VisibleForTesting + @Experimental(Kind.FILESYSTEM) final void copyToOutputFiles(Map<ResourceId, ResourceId> filenames) throws IOException { int numFiles = filenames.size(); @@ -637,6 +652,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { * temporary files, this method will remove them. */ @VisibleForTesting + @Experimental(Kind.FILESYSTEM) final void removeTemporaryFiles( Set<ResourceId> knownFiles, boolean shouldRemoveTemporaryDirectory) throws IOException { ResourceId tempDir = tempDirectory.get(); @@ -945,6 +961,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { private final BoundedWindow window; private final PaneInfo paneInfo; + @Experimental(Kind.FILESYSTEM) public FileResult(ResourceId tempFilename, int shard, BoundedWindow window, PaneInfo paneInfo) { this.tempFilename = tempFilename; this.shard = shard; @@ -952,6 +969,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { this.paneInfo = paneInfo; } + @Experimental(Kind.FILESYSTEM) public ResourceId getTempFilename() { return tempFilename; } @@ -972,6 +990,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { return paneInfo; } + @Experimental(Kind.FILESYSTEM) public ResourceId getDestinationFile(FilenamePolicy policy, ResourceId outputDirectory, int numShards, String extension) { checkArgument(getShard() != UNKNOWN_SHARDNUM); http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java index 375264a..601feca 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java @@ -23,6 +23,8 @@ import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; import java.util.Collection; import java.util.List; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.io.fs.CreateOptions; import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.ResourceId; @@ -35,6 +37,7 @@ import org.apache.beam.sdk.io.fs.ResourceId; * <p>All methods are protected, and they are for file system providers to implement. * Clients should use {@link FileSystems} utility. */ +@Experimental(Kind.FILESYSTEM) public abstract class FileSystem<ResourceIdT extends ResourceId> { /** * This is the entry point to convert user-provided specs to {@link ResourceIdT ResourceIds}. http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java index 78a91f6..50ee6eb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java @@ -20,6 +20,8 @@ package org.apache.beam.sdk.io; import com.google.auto.service.AutoService; import java.util.ServiceLoader; import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.options.PipelineOptions; /** @@ -31,6 +33,7 @@ import org.apache.beam.sdk.options.PipelineOptions; * <p>It is optional but recommended to use one of the many build time tools such as * {@link AutoService} to generate the necessary META-INF files automatically. */ +@Experimental(Kind.FILESYSTEM) public interface FileSystemRegistrar { /** * Create zero or more {@link FileSystem filesystems} from the given {@link PipelineOptions}. http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java index 2e11177..1aacc90 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java @@ -49,6 +49,9 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.annotation.Nonnull; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.io.fs.CreateOptions; import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions; import org.apache.beam.sdk.io.fs.MatchResult; @@ -63,11 +66,12 @@ import org.apache.beam.sdk.values.KV; /** * Clients facing {@link FileSystem} utility. */ +@Experimental(Kind.FILESYSTEM) public class FileSystems { public static final String DEFAULT_SCHEME = "default"; - private static final Pattern URI_SCHEME_PATTERN = Pattern.compile( - "(?<scheme>[a-zA-Z][-a-zA-Z0-9+.]*)://.*"); + private static final Pattern FILE_SCHEME_PATTERN = + Pattern.compile("(?<scheme>[a-zA-Z][-a-zA-Z0-9+.]*):.*"); private static final AtomicReference<Map<String, FileSystem>> SCHEME_TO_FILESYSTEM = new AtomicReference<Map<String, FileSystem>>( @@ -416,7 +420,7 @@ public class FileSystems { // from their use in the URI spec. ('*' is not reserved). // Here, we just need the scheme, which is so circumscribed as to be // very easy to extract with a regex. - Matcher matcher = URI_SCHEME_PATTERN.matcher(spec); + Matcher matcher = FILE_SCHEME_PATTERN.matcher(spec); if (!matcher.matches()) { return "file"; @@ -445,12 +449,21 @@ public class FileSystems { /********************************** METHODS FOR REGISTRATION **********************************/ + /** @deprecated to be removed. */ + @Deprecated // for DataflowRunner backwards compatibility. + public static void setDefaultConfigInWorkers(PipelineOptions options) { + setDefaultPipelineOptions(options); + } + /** * Sets the default configuration in workers. * * <p>It will be used in {@link FileSystemRegistrar FileSystemRegistrars} for all schemes. + * + * <p>This is expected only to be used by runners after {@code Pipeline.run}, or in tests. */ - public static void setDefaultConfigInWorkers(PipelineOptions options) { + @Internal + public static void setDefaultPipelineOptions(PipelineOptions options) { checkNotNull(options, "options"); Set<FileSystemRegistrar> registrars = Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE); http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java index f182360..7896e20 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java @@ -20,12 +20,15 @@ package org.apache.beam.sdk.io; import com.google.auto.service.AutoService; import com.google.common.collect.ImmutableList; import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.options.PipelineOptions; /** * {@link AutoService} registrar for the {@link LocalFileSystem}. */ @AutoService(FileSystemRegistrar.class) +@Experimental(Kind.FILESYSTEM) public class LocalFileSystemRegistrar implements FileSystemRegistrar { @Override public Iterable<FileSystem> fromOptions(@Nullable PipelineOptions options) { http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResources.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResources.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResources.java index 817829b..d234bae 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResources.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResources.java @@ -20,6 +20,8 @@ package org.apache.beam.sdk.io; import java.io.File; import java.nio.file.Path; import java.nio.file.Paths; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; @@ -28,6 +30,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction; /** * Helper functions for producing a {@link ResourceId} that references a local file or directory. */ +@Experimental(Kind.FILESYSTEM) public final class LocalResources { public static ResourceId fromFile(File file, boolean isDirectory) { http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java index f73d6f3..c274595 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java @@ -31,6 +31,8 @@ import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; import java.util.NoSuchElementException; import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VoidCoder; @@ -277,6 +279,7 @@ public class TFRecordIO { * * <p>For more information on filenames, see {@link DefaultFilenamePolicy}. */ + @Experimental(Kind.FILESYSTEM) public Write to(ResourceId outputResource) { return toResource(StaticValueProvider.of(outputResource)); } @@ -284,6 +287,7 @@ public class TFRecordIO { /** * Like {@link #to(ResourceId)}. */ + @Experimental(Kind.FILESYSTEM) public Write toResource(ValueProvider<ResourceId> outputResource) { return toBuilder().setOutputPrefix(outputResource).build(); } http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index af6a069..5c068ce 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -23,6 +23,8 @@ import static com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; @@ -306,6 +308,7 @@ public class TextIO { * in which case {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} should * not be set. */ + @Experimental(Kind.FILESYSTEM) public Write to(ResourceId filenamePrefix) { return toResource(StaticValueProvider.of(filenamePrefix)); } @@ -326,6 +329,7 @@ public class TextIO { /** * Like {@link #to(ResourceId)}. */ + @Experimental(Kind.FILESYSTEM) public Write toResource(ValueProvider<ResourceId> filenamePrefix) { return toBuilder().setFilenamePrefix(filenamePrefix).build(); } http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java index dfe771f..9196034 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java @@ -19,6 +19,8 @@ package org.apache.beam.sdk.io.fs; import java.io.Serializable; import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.io.FileSystem; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; @@ -45,6 +47,7 @@ import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; * to generate {@link ResourceId ResourceIds} for resources that may not yet exist. * </ul> */ +@Experimental(Kind.FILESYSTEM) public interface ResourceId extends Serializable { /** http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java index e04c2f8..9206e04 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java @@ -413,7 +413,7 @@ public class TestPipeline extends Pipeline implements TestRule { } options.setStableUniqueNames(CheckEnabled.ERROR); - FileSystems.setDefaultConfigInWorkers(options); + FileSystems.setDefaultPipelineOptions(options); return options; } catch (IOException e) { throw new RuntimeException( http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalResourceIdTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalResourceIdTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalResourceIdTest.java index 7ea85cf..e1ca303 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalResourceIdTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalResourceIdTest.java @@ -31,6 +31,7 @@ import java.io.File; import java.nio.file.Paths; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.io.fs.ResourceIdTester; import org.apache.commons.lang3.SystemUtils; import org.junit.Rule; import org.junit.Test; @@ -259,6 +260,11 @@ public class LocalResourceIdTest { "xyz.txt"); } + @Test + public void testResourceIdTester() throws Exception { + ResourceIdTester.runResourceIdBattery(toResourceIdentifier("/tmp/foo/")); + } + private LocalResourceId toResourceIdentifier(String str) throws Exception { boolean isDirectory; if (SystemUtils.IS_OS_WINDOWS) { http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/ResourceIdTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/ResourceIdTester.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/ResourceIdTester.java new file mode 100644 index 0000000..8ceaeed --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/ResourceIdTester.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.fs; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY; +import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_FILE; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +import com.google.common.testing.EqualsTester; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.io.FileSystems; + +/** + * A utility to test {@link ResourceId} implementations. + */ +@Experimental(Kind.FILESYSTEM) +public final class ResourceIdTester { + /** + * Enforces that the {@link ResourceId} implementation of {@code baseDirectory} meets the + * {@link ResourceId} spec. + */ + public static void runResourceIdBattery(ResourceId baseDirectory) { + checkArgument( + baseDirectory.isDirectory(), "baseDirectory %s is not a directory", baseDirectory); + + List<ResourceId> allResourceIds = new ArrayList<>(); + allResourceIds.add(baseDirectory); + + // Validate that individual resources meet the fairly restrictive spec we have. + validateResourceIds(allResourceIds); + + // Validate operations with resolving child resources. + validateResolvingIds(baseDirectory, allResourceIds); + + // Validate safeguards against resolving bad paths. + validateFailureResolvingIds(baseDirectory); + } + + private static void validateResolvingIds( + ResourceId baseDirectory, List<ResourceId> allResourceIds) { + ResourceId file1 = baseDirectory.resolve("child1", RESOLVE_FILE); + ResourceId file2 = baseDirectory.resolve("child2", RESOLVE_FILE); + ResourceId file2a = baseDirectory.resolve("child2", RESOLVE_FILE); + allResourceIds.add(file1); + allResourceIds.add(file2); + assertThat("Resolved file isDirectory()", file1.isDirectory(), is(false)); + assertThat("Resolved file isDirectory()", file2.isDirectory(), is(false)); + assertThat("Resolved file isDirectory()", file2a.isDirectory(), is(false)); + + ResourceId dir1 = baseDirectory.resolve("child1", RESOLVE_DIRECTORY); + ResourceId dir2 = baseDirectory.resolve("child2", RESOLVE_DIRECTORY); + ResourceId dir2a = baseDirectory.resolve("child2", RESOLVE_DIRECTORY); + assertThat("Resolved directory isDirectory()", dir1.isDirectory(), is(true)); + assertThat("Resolved directory isDirectory()", dir2.isDirectory(), is(true)); + assertThat("Resolved directory isDirectory()", dir2a.isDirectory(), is(true)); + allResourceIds.add(dir1); + allResourceIds.add(dir2); + + // ResourceIds in equality groups. + new EqualsTester() + .addEqualityGroup(file1) + .addEqualityGroup(file2, file2a) + .addEqualityGroup(dir1, dir1.getCurrentDirectory()) + .addEqualityGroup(dir2, dir2a, dir2.getCurrentDirectory()) + .addEqualityGroup(baseDirectory, file1.getCurrentDirectory(), file2.getCurrentDirectory()) + .testEquals(); + + // ResourceId toString() in equality groups. + new EqualsTester() + .addEqualityGroup(file1.toString()) + .addEqualityGroup(file2.toString(), file2a.toString()) + .addEqualityGroup(dir1.toString(), dir1.getCurrentDirectory().toString()) + .addEqualityGroup(dir2.toString(), dir2a.toString(), dir2.getCurrentDirectory().toString()) + .addEqualityGroup( + baseDirectory.toString(), + file1.getCurrentDirectory().toString(), + file2.getCurrentDirectory().toString()) + .testEquals(); + + // TODO: test resolving strings that need to be escaped. + // Possible spec: https://tools.ietf.org/html/rfc3986#section-2 + // May need options to be filesystem-independent, e.g., if filesystems ban certain chars. + } + + private static void validateFailureResolvingIds(ResourceId baseDirectory) { + try { + ResourceId badFile = baseDirectory.resolve("file/", RESOLVE_FILE); + fail(String.format("Resolving badFile %s should have failed", badFile)); + } catch (Throwable t) { + // expected + } + + ResourceId file = baseDirectory.resolve("file", RESOLVE_FILE); + try { + baseDirectory.resolve("file2", RESOLVE_FILE); + fail(String.format("Should not be able to resolve against file resource %s", file)); + } catch (Throwable t) { + // expected + } + } + + private static void validateResourceIds(List<ResourceId> resourceIds) { + for (ResourceId resourceId : resourceIds) { + // ResourceIds should equal themselves. + assertThat("ResourceId equal to itself", resourceId, equalTo(resourceId)); + + // ResourceIds should be clonable via FileSystems#matchNewResource. + ResourceId cloned; + if (resourceId.isDirectory()) { + cloned = FileSystems.matchNewResource(resourceId.toString(), true /* isDirectory */); + } else { + cloned = FileSystems.matchNewResource(resourceId.toString(), false /* isDirectory */); + } + assertThat( + "ResourceId equals clone of itself", cloned, equalTo(resourceId)); + // .. and clones have consistent toString. + assertThat( + "ResourceId toString consistency", cloned.toString(), equalTo(resourceId.toString())); + // .. and have consistent isDirectory. + assertThat( + "ResourceId isDirectory consistency", + cloned.isDirectory(), + equalTo(resourceId.isDirectory())); + } + } + + private ResourceIdTester() {} // prevent instantiation +} http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/extensions/google-cloud-platform-core/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/pom.xml b/sdks/java/extensions/google-cloud-platform-core/pom.xml index 5f187c4..891c476 100644 --- a/sdks/java/extensions/google-cloud-platform-core/pom.xml +++ b/sdks/java/extensions/google-cloud-platform-core/pom.xml @@ -152,6 +152,12 @@ <!-- test dependencies --> <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava-testlib</artifactId> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-core</artifactId> <classifier>tests</classifier> http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java index 9f5980a..f3a67b2 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java @@ -22,6 +22,8 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.auto.service.AutoService; import com.google.common.collect.ImmutableList; import javax.annotation.Nonnull; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.io.FileSystem; import org.apache.beam.sdk.io.FileSystemRegistrar; @@ -31,13 +33,14 @@ import org.apache.beam.sdk.options.PipelineOptions; * {@link AutoService} registrar for the {@link GcsFileSystem}. */ @AutoService(FileSystemRegistrar.class) +@Experimental(Kind.FILESYSTEM) public class GcsFileSystemRegistrar implements FileSystemRegistrar { @Override public Iterable<FileSystem> fromOptions(@Nonnull PipelineOptions options) { checkNotNull( options, - "Expect the runner have called FileSystems.setDefaultConfigInWorkers()."); + "Expect the runner have called FileSystems.setDefaultPipelineOptions()."); return ImmutableList.<FileSystem>of(new GcsFileSystem(options.as(GcsOptions.class))); } } http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java index b245610..c1e214e 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java @@ -22,8 +22,11 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; +import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.io.fs.ResourceIdTester; +import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.junit.Rule; import org.junit.Test; @@ -163,6 +166,12 @@ public class GcsResourceIdTest { "xyz.txt"); } + @Test + public void testResourceIdTester() throws Exception { + FileSystems.setDefaultPipelineOptions(TestPipeline.testingPipelineOptions()); + ResourceIdTester.runResourceIdBattery(toResourceIdentifier("gs://bucket/foo/")); + } + private GcsResourceId toResourceIdentifier(String str) throws Exception { return GcsResourceId.fromGcsPath(GcsPath.fromUri(str)); } http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/io/hadoop-file-system/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/pom.xml b/sdks/java/io/hadoop-file-system/pom.xml index 46f3e32..b90ccf0 100644 --- a/sdks/java/io/hadoop-file-system/pom.xml +++ b/sdks/java/io/hadoop-file-system/pom.xml @@ -157,6 +157,19 @@ </dependency> <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava-testlib</artifactId> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-all</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java index 154a818..d519a8c 100644 --- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java +++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java @@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import java.io.IOException; import java.net.URI; -import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; @@ -82,8 +81,9 @@ class HadoopFileSystem extends FileSystem<HadoopResourceId> { List<Metadata> metadata = new ArrayList<>(); for (FileStatus fileStatus : fileStatuses) { if (fileStatus.isFile()) { + URI uri = dropEmptyAuthority(fileStatus.getPath().toUri().toString()); metadata.add(Metadata.builder() - .setResourceId(new HadoopResourceId(fileStatus.getPath().toUri())) + .setResourceId(new HadoopResourceId(uri)) .setIsReadSeekEfficient(true) .setSizeBytes(fileStatus.getLen()) .build()); @@ -151,19 +151,13 @@ class HadoopFileSystem extends FileSystem<HadoopResourceId> { @Override protected HadoopResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) { - try { - if (singleResourceSpec.endsWith("/") && !isDirectory) { - throw new IllegalArgumentException(String.format( - "Expected file path but received directory path %s", singleResourceSpec)); - } - return !singleResourceSpec.endsWith("/") && isDirectory - ? new HadoopResourceId(new URI(singleResourceSpec + "/")) - : new HadoopResourceId(new URI(singleResourceSpec)); - } catch (URISyntaxException e) { - throw new IllegalArgumentException( - String.format("Invalid spec %s directory %s", singleResourceSpec, isDirectory), - e); + if (singleResourceSpec.endsWith("/") && !isDirectory) { + throw new IllegalArgumentException(String.format( + "Expected file path but received directory path %s", singleResourceSpec)); } + return !singleResourceSpec.endsWith("/") && isDirectory + ? new HadoopResourceId(dropEmptyAuthority(singleResourceSpec + "/")) + : new HadoopResourceId(dropEmptyAuthority(singleResourceSpec)); } @Override @@ -237,4 +231,14 @@ class HadoopFileSystem extends FileSystem<HadoopResourceId> { inputStream.close(); } } + + private static URI dropEmptyAuthority(String uriStr) { + URI uri = URI.create(uriStr); + String prefix = uri.getScheme() + ":///"; + if (uriStr.startsWith(prefix)) { + return URI.create(uri.getScheme() + ":/" + uriStr.substring(prefix.length())); + } else { + return uri; + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java index 45f43e2..f5183d5 100644 --- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java +++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java @@ -25,6 +25,8 @@ import java.io.File; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; @@ -38,6 +40,7 @@ import org.slf4j.LoggerFactory; * {@link PipelineOptions} which encapsulate {@link Configuration Hadoop Configuration} * for the {@link HadoopFileSystem}. */ +@Experimental(Kind.FILESYSTEM) public interface HadoopFileSystemOptions extends PipelineOptions { @Description("A list of Hadoop configurations used to configure zero or more Hadoop filesystems. " + "By default, Hadoop configuration is loaded from 'core-site.xml' and 'hdfs-site.xml' " http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java index 9159df3..8c57089 100644 --- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java +++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java @@ -25,6 +25,8 @@ import java.io.IOException; import java.util.Collections; import java.util.List; import javax.annotation.Nonnull; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.io.FileSystem; import org.apache.beam.sdk.io.FileSystemRegistrar; import org.apache.beam.sdk.options.PipelineOptions; @@ -34,6 +36,7 @@ import org.apache.hadoop.conf.Configuration; * {@link AutoService} registrar for the {@link HadoopFileSystem}. */ @AutoService(FileSystemRegistrar.class) +@Experimental(Kind.FILESYSTEM) public class HadoopFileSystemRegistrar implements FileSystemRegistrar { @Override http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java index e570864..88fa32a 100644 --- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java +++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java @@ -17,9 +17,12 @@ */ package org.apache.beam.sdk.io.hdfs; +import static com.google.common.base.Preconditions.checkArgument; + import java.net.URI; import java.util.Objects; import org.apache.beam.sdk.io.fs.ResolveOptions; +import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.hadoop.fs.Path; @@ -35,7 +38,18 @@ class HadoopResourceId implements ResourceId { @Override public ResourceId resolve(String other, ResolveOptions resolveOptions) { - return new HadoopResourceId(uri.resolve(other)); + if (resolveOptions == StandardResolveOptions.RESOLVE_DIRECTORY) { + if (!other.endsWith("/")) { + other += '/'; + } + return new HadoopResourceId(uri.resolve(other)); + } else if (resolveOptions == StandardResolveOptions.RESOLVE_FILE) { + checkArgument(!other.endsWith("/"), "Resolving a file with a directory path: %s", other); + return new HadoopResourceId(uri.resolve(other)); + } else { + throw new UnsupportedOperationException( + String.format("Unexpected StandardResolveOptions %s", resolveOptions)); + } } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java index cf86c36..88275f4 100644 --- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java +++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java @@ -63,14 +63,13 @@ public class HadoopFileSystemTest { @Rule public TestPipeline p = TestPipeline.create(); @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); @Rule public ExpectedException thrown = ExpectedException.none(); - private Configuration configuration; private MiniDFSCluster hdfsCluster; private URI hdfsClusterBaseUri; private HadoopFileSystem fileSystem; @Before public void setUp() throws Exception { - configuration = new Configuration(); + Configuration configuration = new Configuration(); configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.getRoot().getAbsolutePath()); MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(configuration); hdfsCluster = builder.build(); @@ -220,7 +219,7 @@ public class HadoopFileSystemTest { HadoopFileSystemOptions options = TestPipeline.testingPipelineOptions() .as(HadoopFileSystemOptions.class); options.setHdfsConfiguration(ImmutableList.of(fileSystem.fileSystem.getConf())); - FileSystems.setDefaultConfigInWorkers(options); + FileSystems.setDefaultPipelineOptions(options); PCollection<String> pc = p.apply( TextIO.read().from(testPath("testFile*").toString())); PAssert.that(pc).containsInAnyOrder("testDataA", "testDataB", "testDataC"); http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java new file mode 100644 index 0000000..c4a8577 --- /dev/null +++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import java.net.URI; +import java.util.Collections; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.io.fs.ResourceIdTester; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * Tests for {@link HadoopResourceId}. + */ +public class HadoopResourceIdTest { + + private MiniDFSCluster hdfsCluster; + private URI hdfsClusterBaseUri; + + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Before + public void setUp() throws Exception { + Configuration configuration = new Configuration(); + configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.getRoot().getAbsolutePath()); + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(configuration); + hdfsCluster = builder.build(); + hdfsClusterBaseUri = new URI(configuration.get("fs.defaultFS") + "/"); + + // Register HadoopFileSystem for this test. + HadoopFileSystemOptions options = PipelineOptionsFactory.as(HadoopFileSystemOptions.class); + options.setHdfsConfiguration(Collections.singletonList(configuration)); + FileSystems.setDefaultPipelineOptions(options); + } + + @After + public void tearDown() throws Exception { + hdfsCluster.shutdown(); + } + + @Test + public void testResourceIdTester() throws Exception { + ResourceId baseDirectory = + FileSystems.matchNewResource( + "hdfs://" + hdfsClusterBaseUri.getPath(), true /* isDirectory */); + ResourceIdTester.runResourceIdBattery(baseDirectory); + } +}
