Repository: beam
Updated Branches:
  refs/heads/release-2.0.0 336b3dc29 -> bad377c67


[BEAM-2211] DataflowRunner: remove validation of file read/write paths

Now that users can implement and register custom FileSystems,
we can no longer really effectively validate filesystems they
can read or write files from. They can even register file://
to point to some HDFS path, e.g.,


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9e90e548
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9e90e548
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9e90e548

Branch: refs/heads/release-2.0.0
Commit: 9e90e54809d0e11774e8e923e9f9f19651f1d28f
Parents: 336b3dc
Author: Dan Halperin <[email protected]>
Authored: Mon May 8 15:37:29 2017 -0700
Committer: Dan Halperin <[email protected]>
Committed: Mon May 8 20:19:08 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   | 60 --------------------
 .../beam/runners/dataflow/ReadTranslator.java   | 12 ----
 .../DataflowPipelineTranslatorTest.java         | 26 ---------
 .../runners/dataflow/DataflowRunnerTest.java    | 56 ------------------
 4 files changed, 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9e90e548/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 5278a4a..250c064 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -84,13 +84,10 @@ import 
org.apache.beam.sdk.coders.Coder.NonDeterministicException;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.io.fs.PathValidator;
-import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
@@ -98,7 +95,6 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
-import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverride;
@@ -343,10 +339,6 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
                   PTransformMatchers.stateOrTimerParDoSingle(),
                   BatchStatefulParDoOverrides.singleOutputOverrideFactory()))
 
-          // WriteFiles uses views internally
-          .add(
-              PTransformOverride.of(
-                  PTransformMatchers.classEqualTo(WriteFiles.class), new 
BatchWriteFactory(this)))
           .add(
               PTransformOverride.of(
                   
PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
@@ -805,58 +797,6 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
     ptransformViewsWithNonDeterministicKeyCoders.add(ptransform);
   }
 
-  private class BatchWriteFactory<T>
-      implements PTransformOverrideFactory<PCollection<T>, PDone, 
WriteFiles<T>> {
-    private final DataflowRunner runner;
-    private BatchWriteFactory(DataflowRunner dataflowRunner) {
-      this.runner = dataflowRunner;
-    }
-
-    @Override
-    public PTransformReplacement<PCollection<T>, PDone> 
getReplacementTransform(
-        AppliedPTransform<PCollection<T>, PDone, WriteFiles<T>> transform) {
-      return PTransformReplacement.of(
-          PTransformReplacements.getSingletonMainInput(transform),
-          new BatchWrite<>(runner, transform.getTransform()));
-    }
-
-    @Override
-    public Map<PValue, ReplacementOutput> mapOutputs(
-        Map<TupleTag<?>, PValue> outputs, PDone newOutput) {
-      return Collections.emptyMap();
-    }
-  }
-
-  /**
-   * Specialized implementation which overrides
-   * {@link WriteFiles WriteFiles} to provide Google
-   * Cloud Dataflow specific path validation of {@link FileBasedSink}s.
-   */
-  private static class BatchWrite<T> extends PTransform<PCollection<T>, PDone> 
{
-    private final DataflowRunner runner;
-    private final WriteFiles<T> transform;
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in 
DataflowRunner#apply()
-    public BatchWrite(DataflowRunner runner, WriteFiles<T> transform) {
-      this.runner = runner;
-      this.transform = transform;
-    }
-
-    @Override
-    public PDone expand(PCollection<T> input) {
-      ValueProvider<ResourceId> outputDirectory =
-          transform.getSink().getBaseOutputDirectoryProvider();
-      if (outputDirectory.isAccessible()) {
-        PathValidator validator = runner.options.getPathValidator();
-        validator.validateOutputResourceSupported(
-            outputDirectory.get().resolve("some-file", 
StandardResolveOptions.RESOLVE_FILE));
-      }
-      return transform.expand(input);
-    }
-  }
-
   // 
================================================================================
   // PubsubIO translations
   // 
================================================================================

http://git-wip-us.apache.org/repos/asf/beam/blob/9e90e548/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
index 396c305..30ecbf5 100755
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
@@ -26,10 +26,8 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.beam.runners.dataflow.internal.CustomSources;
 import org.apache.beam.runners.dataflow.util.PropertyNames;
-import org.apache.beam.sdk.io.FileBasedSource;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.Source;
-import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PValue;
 
@@ -46,16 +44,6 @@ class ReadTranslator implements 
TransformTranslator<Read.Bounded<?>> {
       PTransform<?, ? extends PValue> transform,
       TranslationContext context) {
     try {
-      if (source instanceof FileBasedSource) {
-        ValueProvider<String> filePatternOrSpec =
-            ((FileBasedSource<?>) source).getFileOrPatternSpecProvider();
-        if (filePatternOrSpec.isAccessible()) {
-          context.getPipelineOptions()
-              .getPathValidator()
-              .validateInputFilePatternSupported(filePatternOrSpec.get());
-        }
-      }
-
       StepTranslationContext stepContext = context.addStep(transform, 
"ParallelRead");
       stepContext.addInput(PropertyNames.FORMAT, 
PropertyNames.CUSTOM_SOURCE_FORMAT);
       stepContext.addInput(

http://git-wip-us.apache.org/repos/asf/beam/blob/9e90e548/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 48cb79f..9040f8f 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -20,13 +20,10 @@ package org.apache.beam.runners.dataflow;
 import static org.apache.beam.runners.dataflow.util.Structs.addObject;
 import static org.apache.beam.runners.dataflow.util.Structs.getDictionary;
 import static org.apache.beam.runners.dataflow.util.Structs.getString;
-import static org.hamcrest.Matchers.allOf;
-import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasEntry;
 import static org.hamcrest.Matchers.hasKey;
 import static org.hamcrest.Matchers.not;
-import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
@@ -109,7 +106,6 @@ import org.apache.beam.sdk.values.WindowingStrategy;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.internal.matchers.ThrowableMessageMatcher;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -809,28 +805,6 @@ public class DataflowPipelineTranslatorTest implements 
Serializable {
     pipeline.apply("Read(" + path + ")", TextIO.read().from(path));
   }
 
-  /**
-   * Recursive wildcards are not supported.
-   * This tests "**".
-   */
-  @Test
-  public void testBadWildcardRecursive() throws Exception {
-    DataflowPipelineOptions options = buildPipelineOptions();
-    Pipeline pipeline = Pipeline.create(options);
-    DataflowPipelineTranslator t = 
DataflowPipelineTranslator.fromOptions(options);
-
-    pipeline.apply(TextIO.read().from("gs://bucket/foo**/baz"));
-
-    // Check that translation does fail.
-    thrown.expectCause(allOf(
-        instanceOf(IllegalArgumentException.class),
-        ThrowableMessageMatcher.hasMessage(containsString("Unsupported 
wildcard usage"))));
-    t.translate(
-        pipeline,
-        DataflowRunner.fromOptions(options),
-        Collections.<DataflowPackage>emptyList());
-  }
-
   private static class TestValueProvider implements ValueProvider<String>, 
Serializable {
     @Override
     public boolean isAccessible() {

http://git-wip-us.apache.org/repos/asf/beam/blob/9e90e548/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 3d47726..6848b85 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.both;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItem;
-import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.startsWith;
 import static org.junit.Assert.assertEquals;
@@ -97,7 +96,6 @@ import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.internal.matchers.ThrowableMessageMatcher;
 import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
@@ -588,60 +586,6 @@ public class DataflowRunnerTest {
   }
 
   @Test
-  public void testNonGcsFilePathInReadFailure() throws IOException {
-    Pipeline p = buildDataflowPipeline(buildPipelineOptions());
-    p.apply("ReadMyNonGcsFile", 
TextIO.read().from(tmpFolder.newFile().getPath()));
-
-    thrown.expectCause(Matchers.allOf(
-        instanceOf(IllegalArgumentException.class),
-        ThrowableMessageMatcher.hasMessage(
-            containsString("Expected a valid 'gs://' path but was given"))));
-    p.run();
-
-    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-    Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), 
jobCaptor.capture());
-    assertValidJob(jobCaptor.getValue());
-  }
-
-  @Test
-  public void testNonGcsFilePathInWriteFailure() throws IOException {
-    Pipeline p = buildDataflowPipeline(buildPipelineOptions());
-
-    p.apply("ReadMyGcsFile", TextIO.read().from("gs://bucket/object"))
-        .apply("WriteMyNonGcsFile", TextIO.write().to("/tmp/file"));
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(containsString("Expected a valid 'gs://' path but was 
given"));
-    p.run();
-  }
-
-  @Test
-  public void testMultiSlashGcsFileReadPath() throws IOException {
-    Pipeline p = buildDataflowPipeline(buildPipelineOptions());
-    p.apply("ReadInvalidGcsFile", TextIO.read().from("gs://bucket/tmp//file"));
-
-    thrown.expectCause(Matchers.allOf(
-        instanceOf(IllegalArgumentException.class),
-        ThrowableMessageMatcher.hasMessage(containsString("consecutive 
slashes"))));
-    p.run();
-
-    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-    Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), 
jobCaptor.capture());
-    assertValidJob(jobCaptor.getValue());
-  }
-
-  @Test
-  public void testMultiSlashGcsFileWritePath() throws IOException {
-    Pipeline p = buildDataflowPipeline(buildPipelineOptions());
-    PCollection<String> pc = p.apply("ReadMyGcsFile", 
TextIO.read().from("gs://bucket/object"));
-    pc.apply("WriteInvalidGcsFile", 
TextIO.write().to("gs://bucket/tmp//file"));
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("consecutive slashes");
-    p.run();
-  }
-
-  @Test
   public void testInvalidGcpTempLocation() throws IOException {
     DataflowPipelineOptions options = buildPipelineOptions();
     options.setGcpTempLocation("file://temp/location");

Reply via email to