Repository: beam Updated Branches: refs/heads/master 88e044fda -> 23731fe7a
[BEAM-2211] DataflowRunner: remove validation of file read/write paths Now that users can implement and register custom FileSystems, we can no longer really effectively validate filesystems they can read or write files from. They can even register file:// to point to some HDFS path, e.g., Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/16f355f7 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/16f355f7 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/16f355f7 Branch: refs/heads/master Commit: 16f355f7e481ebf029c0edb878742f6fea57b6cd Parents: 88e044f Author: Dan Halperin <[email protected]> Authored: Mon May 8 15:37:29 2017 -0700 Committer: Dan Halperin <[email protected]> Committed: Mon May 8 20:13:35 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/dataflow/DataflowRunner.java | 60 -------------------- .../beam/runners/dataflow/ReadTranslator.java | 12 ---- .../DataflowPipelineTranslatorTest.java | 26 --------- .../runners/dataflow/DataflowRunnerTest.java | 56 ------------------ 4 files changed, 154 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/16f355f7/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 5278a4a..250c064 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -84,13 +84,10 @@ import org.apache.beam.sdk.coders.Coder.NonDeterministicException; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.io.WriteFiles; import org.apache.beam.sdk.io.fs.PathValidator; -import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder; @@ -98,7 +95,6 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink; import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; -import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.PTransformOverride; @@ -343,10 +339,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { PTransformMatchers.stateOrTimerParDoSingle(), BatchStatefulParDoOverrides.singleOutputOverrideFactory())) - // WriteFiles uses views internally - .add( - PTransformOverride.of( - PTransformMatchers.classEqualTo(WriteFiles.class), new BatchWriteFactory(this))) .add( PTransformOverride.of( PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class), @@ -805,58 +797,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { ptransformViewsWithNonDeterministicKeyCoders.add(ptransform); } - private class BatchWriteFactory<T> - implements PTransformOverrideFactory<PCollection<T>, PDone, WriteFiles<T>> { - private final DataflowRunner runner; - private BatchWriteFactory(DataflowRunner dataflowRunner) { - this.runner = dataflowRunner; - } - - @Override - public PTransformReplacement<PCollection<T>, PDone> getReplacementTransform( - AppliedPTransform<PCollection<T>, PDone, WriteFiles<T>> transform) { - return PTransformReplacement.of( - PTransformReplacements.getSingletonMainInput(transform), - new BatchWrite<>(runner, transform.getTransform())); - } - - @Override - public Map<PValue, ReplacementOutput> mapOutputs( - Map<TupleTag<?>, PValue> outputs, PDone newOutput) { - return Collections.emptyMap(); - } - } - - /** - * Specialized implementation which overrides - * {@link WriteFiles WriteFiles} to provide Google - * Cloud Dataflow specific path validation of {@link FileBasedSink}s. - */ - private static class BatchWrite<T> extends PTransform<PCollection<T>, PDone> { - private final DataflowRunner runner; - private final WriteFiles<T> transform; - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() - public BatchWrite(DataflowRunner runner, WriteFiles<T> transform) { - this.runner = runner; - this.transform = transform; - } - - @Override - public PDone expand(PCollection<T> input) { - ValueProvider<ResourceId> outputDirectory = - transform.getSink().getBaseOutputDirectoryProvider(); - if (outputDirectory.isAccessible()) { - PathValidator validator = runner.options.getPathValidator(); - validator.validateOutputResourceSupported( - outputDirectory.get().resolve("some-file", StandardResolveOptions.RESOLVE_FILE)); - } - return transform.expand(input); - } - } - // ================================================================================ // PubsubIO translations // ================================================================================ http://git-wip-us.apache.org/repos/asf/beam/blob/16f355f7/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java index 396c305..30ecbf5 100755 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java @@ -26,10 +26,8 @@ import java.util.HashMap; import java.util.Map; import org.apache.beam.runners.dataflow.internal.CustomSources; import org.apache.beam.runners.dataflow.util.PropertyNames; -import org.apache.beam.sdk.io.FileBasedSource; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.Source; -import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PValue; @@ -46,16 +44,6 @@ class ReadTranslator implements TransformTranslator<Read.Bounded<?>> { PTransform<?, ? extends PValue> transform, TranslationContext context) { try { - if (source instanceof FileBasedSource) { - ValueProvider<String> filePatternOrSpec = - ((FileBasedSource<?>) source).getFileOrPatternSpecProvider(); - if (filePatternOrSpec.isAccessible()) { - context.getPipelineOptions() - .getPathValidator() - .validateInputFilePatternSupported(filePatternOrSpec.get()); - } - } - StepTranslationContext stepContext = context.addStep(transform, "ParallelRead"); stepContext.addInput(PropertyNames.FORMAT, PropertyNames.CUSTOM_SOURCE_FORMAT); stepContext.addInput( http://git-wip-us.apache.org/repos/asf/beam/blob/16f355f7/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 48cb79f..9040f8f 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -20,13 +20,10 @@ package org.apache.beam.runners.dataflow; import static org.apache.beam.runners.dataflow.util.Structs.addObject; import static org.apache.beam.runners.dataflow.util.Structs.getDictionary; import static org.apache.beam.runners.dataflow.util.Structs.getString; -import static org.hamcrest.Matchers.allOf; -import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.not; -import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; @@ -109,7 +106,6 @@ import org.apache.beam.sdk.values.WindowingStrategy; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; -import org.junit.internal.matchers.ThrowableMessageMatcher; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -809,28 +805,6 @@ public class DataflowPipelineTranslatorTest implements Serializable { pipeline.apply("Read(" + path + ")", TextIO.read().from(path)); } - /** - * Recursive wildcards are not supported. - * This tests "**". - */ - @Test - public void testBadWildcardRecursive() throws Exception { - DataflowPipelineOptions options = buildPipelineOptions(); - Pipeline pipeline = Pipeline.create(options); - DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options); - - pipeline.apply(TextIO.read().from("gs://bucket/foo**/baz")); - - // Check that translation does fail. - thrown.expectCause(allOf( - instanceOf(IllegalArgumentException.class), - ThrowableMessageMatcher.hasMessage(containsString("Unsupported wildcard usage")))); - t.translate( - pipeline, - DataflowRunner.fromOptions(options), - Collections.<DataflowPackage>emptyList()); - } - private static class TestValueProvider implements ValueProvider<String>, Serializable { @Override public boolean isAccessible() { http://git-wip-us.apache.org/repos/asf/beam/blob/16f355f7/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 3d47726..6848b85 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; -import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; @@ -97,7 +96,6 @@ import org.joda.time.Instant; import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.junit.internal.matchers.ThrowableMessageMatcher; import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; @@ -588,60 +586,6 @@ public class DataflowRunnerTest { } @Test - public void testNonGcsFilePathInReadFailure() throws IOException { - Pipeline p = buildDataflowPipeline(buildPipelineOptions()); - p.apply("ReadMyNonGcsFile", TextIO.read().from(tmpFolder.newFile().getPath())); - - thrown.expectCause(Matchers.allOf( - instanceOf(IllegalArgumentException.class), - ThrowableMessageMatcher.hasMessage( - containsString("Expected a valid 'gs://' path but was given")))); - p.run(); - - ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); - Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture()); - assertValidJob(jobCaptor.getValue()); - } - - @Test - public void testNonGcsFilePathInWriteFailure() throws IOException { - Pipeline p = buildDataflowPipeline(buildPipelineOptions()); - - p.apply("ReadMyGcsFile", TextIO.read().from("gs://bucket/object")) - .apply("WriteMyNonGcsFile", TextIO.write().to("/tmp/file")); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage(containsString("Expected a valid 'gs://' path but was given")); - p.run(); - } - - @Test - public void testMultiSlashGcsFileReadPath() throws IOException { - Pipeline p = buildDataflowPipeline(buildPipelineOptions()); - p.apply("ReadInvalidGcsFile", TextIO.read().from("gs://bucket/tmp//file")); - - thrown.expectCause(Matchers.allOf( - instanceOf(IllegalArgumentException.class), - ThrowableMessageMatcher.hasMessage(containsString("consecutive slashes")))); - p.run(); - - ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); - Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture()); - assertValidJob(jobCaptor.getValue()); - } - - @Test - public void testMultiSlashGcsFileWritePath() throws IOException { - Pipeline p = buildDataflowPipeline(buildPipelineOptions()); - PCollection<String> pc = p.apply("ReadMyGcsFile", TextIO.read().from("gs://bucket/object")); - pc.apply("WriteInvalidGcsFile", TextIO.write().to("gs://bucket/tmp//file")); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("consecutive slashes"); - p.run(); - } - - @Test public void testInvalidGcpTempLocation() throws IOException { DataflowPipelineOptions options = buildPipelineOptions(); options.setGcpTempLocation("file://temp/location");
