Repository: beam Updated Branches: refs/heads/release-2.0.0 336b3dc29 -> bad377c67
[BEAM-2211] DataflowRunner: remove validation of file read/write paths Now that users can implement and register custom FileSystems, we can no longer really effectively validate filesystems they can read or write files from. They can even register file:// to point to some HDFS path, e.g., Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9e90e548 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9e90e548 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9e90e548 Branch: refs/heads/release-2.0.0 Commit: 9e90e54809d0e11774e8e923e9f9f19651f1d28f Parents: 336b3dc Author: Dan Halperin <[email protected]> Authored: Mon May 8 15:37:29 2017 -0700 Committer: Dan Halperin <[email protected]> Committed: Mon May 8 20:19:08 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/dataflow/DataflowRunner.java | 60 -------------------- .../beam/runners/dataflow/ReadTranslator.java | 12 ---- .../DataflowPipelineTranslatorTest.java | 26 --------- .../runners/dataflow/DataflowRunnerTest.java | 56 ------------------ 4 files changed, 154 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9e90e548/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 5278a4a..250c064 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -84,13 +84,10 @@ import org.apache.beam.sdk.coders.Coder.NonDeterministicException; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.io.WriteFiles; import org.apache.beam.sdk.io.fs.PathValidator; -import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder; @@ -98,7 +95,6 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink; import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; -import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.PTransformOverride; @@ -343,10 +339,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { PTransformMatchers.stateOrTimerParDoSingle(), BatchStatefulParDoOverrides.singleOutputOverrideFactory())) - // WriteFiles uses views internally - .add( - PTransformOverride.of( - PTransformMatchers.classEqualTo(WriteFiles.class), new BatchWriteFactory(this))) .add( PTransformOverride.of( PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class), @@ -805,58 +797,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { ptransformViewsWithNonDeterministicKeyCoders.add(ptransform); } - private class BatchWriteFactory<T> - implements PTransformOverrideFactory<PCollection<T>, PDone, WriteFiles<T>> { - private final DataflowRunner runner; - private BatchWriteFactory(DataflowRunner dataflowRunner) { - this.runner = dataflowRunner; - } - - @Override - public PTransformReplacement<PCollection<T>, PDone> getReplacementTransform( - AppliedPTransform<PCollection<T>, PDone, WriteFiles<T>> transform) { - return PTransformReplacement.of( - PTransformReplacements.getSingletonMainInput(transform), - new BatchWrite<>(runner, transform.getTransform())); - } - - @Override - public Map<PValue, ReplacementOutput> mapOutputs( - Map<TupleTag<?>, PValue> outputs, PDone newOutput) { - return Collections.emptyMap(); - } - } - - /** - * Specialized implementation which overrides - * {@link WriteFiles WriteFiles} to provide Google - * Cloud Dataflow specific path validation of {@link FileBasedSink}s. - */ - private static class BatchWrite<T> extends PTransform<PCollection<T>, PDone> { - private final DataflowRunner runner; - private final WriteFiles<T> transform; - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() - public BatchWrite(DataflowRunner runner, WriteFiles<T> transform) { - this.runner = runner; - this.transform = transform; - } - - @Override - public PDone expand(PCollection<T> input) { - ValueProvider<ResourceId> outputDirectory = - transform.getSink().getBaseOutputDirectoryProvider(); - if (outputDirectory.isAccessible()) { - PathValidator validator = runner.options.getPathValidator(); - validator.validateOutputResourceSupported( - outputDirectory.get().resolve("some-file", StandardResolveOptions.RESOLVE_FILE)); - } - return transform.expand(input); - } - } - // ================================================================================ // PubsubIO translations // ================================================================================ http://git-wip-us.apache.org/repos/asf/beam/blob/9e90e548/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java index 396c305..30ecbf5 100755 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java @@ -26,10 +26,8 @@ import java.util.HashMap; import java.util.Map; import org.apache.beam.runners.dataflow.internal.CustomSources; import org.apache.beam.runners.dataflow.util.PropertyNames; -import org.apache.beam.sdk.io.FileBasedSource; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.Source; -import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PValue; @@ -46,16 +44,6 @@ class ReadTranslator implements TransformTranslator<Read.Bounded<?>> { PTransform<?, ? extends PValue> transform, TranslationContext context) { try { - if (source instanceof FileBasedSource) { - ValueProvider<String> filePatternOrSpec = - ((FileBasedSource<?>) source).getFileOrPatternSpecProvider(); - if (filePatternOrSpec.isAccessible()) { - context.getPipelineOptions() - .getPathValidator() - .validateInputFilePatternSupported(filePatternOrSpec.get()); - } - } - StepTranslationContext stepContext = context.addStep(transform, "ParallelRead"); stepContext.addInput(PropertyNames.FORMAT, PropertyNames.CUSTOM_SOURCE_FORMAT); stepContext.addInput( http://git-wip-us.apache.org/repos/asf/beam/blob/9e90e548/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 48cb79f..9040f8f 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -20,13 +20,10 @@ package org.apache.beam.runners.dataflow; import static org.apache.beam.runners.dataflow.util.Structs.addObject; import static org.apache.beam.runners.dataflow.util.Structs.getDictionary; import static org.apache.beam.runners.dataflow.util.Structs.getString; -import static org.hamcrest.Matchers.allOf; -import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.not; -import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; @@ -109,7 +106,6 @@ import org.apache.beam.sdk.values.WindowingStrategy; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; -import org.junit.internal.matchers.ThrowableMessageMatcher; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -809,28 +805,6 @@ public class DataflowPipelineTranslatorTest implements Serializable { pipeline.apply("Read(" + path + ")", TextIO.read().from(path)); } - /** - * Recursive wildcards are not supported. - * This tests "**". - */ - @Test - public void testBadWildcardRecursive() throws Exception { - DataflowPipelineOptions options = buildPipelineOptions(); - Pipeline pipeline = Pipeline.create(options); - DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options); - - pipeline.apply(TextIO.read().from("gs://bucket/foo**/baz")); - - // Check that translation does fail. - thrown.expectCause(allOf( - instanceOf(IllegalArgumentException.class), - ThrowableMessageMatcher.hasMessage(containsString("Unsupported wildcard usage")))); - t.translate( - pipeline, - DataflowRunner.fromOptions(options), - Collections.<DataflowPackage>emptyList()); - } - private static class TestValueProvider implements ValueProvider<String>, Serializable { @Override public boolean isAccessible() { http://git-wip-us.apache.org/repos/asf/beam/blob/9e90e548/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 3d47726..6848b85 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; -import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; @@ -97,7 +96,6 @@ import org.joda.time.Instant; import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.junit.internal.matchers.ThrowableMessageMatcher; import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; @@ -588,60 +586,6 @@ public class DataflowRunnerTest { } @Test - public void testNonGcsFilePathInReadFailure() throws IOException { - Pipeline p = buildDataflowPipeline(buildPipelineOptions()); - p.apply("ReadMyNonGcsFile", TextIO.read().from(tmpFolder.newFile().getPath())); - - thrown.expectCause(Matchers.allOf( - instanceOf(IllegalArgumentException.class), - ThrowableMessageMatcher.hasMessage( - containsString("Expected a valid 'gs://' path but was given")))); - p.run(); - - ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); - Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture()); - assertValidJob(jobCaptor.getValue()); - } - - @Test - public void testNonGcsFilePathInWriteFailure() throws IOException { - Pipeline p = buildDataflowPipeline(buildPipelineOptions()); - - p.apply("ReadMyGcsFile", TextIO.read().from("gs://bucket/object")) - .apply("WriteMyNonGcsFile", TextIO.write().to("/tmp/file")); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage(containsString("Expected a valid 'gs://' path but was given")); - p.run(); - } - - @Test - public void testMultiSlashGcsFileReadPath() throws IOException { - Pipeline p = buildDataflowPipeline(buildPipelineOptions()); - p.apply("ReadInvalidGcsFile", TextIO.read().from("gs://bucket/tmp//file")); - - thrown.expectCause(Matchers.allOf( - instanceOf(IllegalArgumentException.class), - ThrowableMessageMatcher.hasMessage(containsString("consecutive slashes")))); - p.run(); - - ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); - Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture()); - assertValidJob(jobCaptor.getValue()); - } - - @Test - public void testMultiSlashGcsFileWritePath() throws IOException { - Pipeline p = buildDataflowPipeline(buildPipelineOptions()); - PCollection<String> pc = p.apply("ReadMyGcsFile", TextIO.read().from("gs://bucket/object")); - pc.apply("WriteInvalidGcsFile", TextIO.write().to("gs://bucket/tmp//file")); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("consecutive slashes"); - p.run(); - } - - @Test public void testInvalidGcpTempLocation() throws IOException { DataflowPipelineOptions options = buildPipelineOptions(); options.setGcpTempLocation("file://temp/location");
