boyuanzz commented on a change in pull request #12519:
URL: https://github.com/apache/beam/pull/12519#discussion_r468874809



##########
File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
##########
@@ -211,10 +211,6 @@ public FinishBundleContext 
finishBundleContext(DoFn<InputT, OutputT> doFn) {
     KV<RestrictionT, KV<Instant, WatermarkEstimatorStateT>> residual =
         processContext.getTakenCheckpoint();
     if (cont.shouldResume()) {
-      checkState(

Review comment:
       Is it because we have `checkDone` now?

##########
File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
##########
@@ -630,4 +635,38 @@ public void tearDown() {
       invoker = null;
     }
   }
+
+  /**
+   * Throws an {@link IllegalArgumentException} if the pipeline contains any 
primitive read
+   * transforms that have not been expanded to be executed as {@link DoFn 
splittable DoFns}.
+   */
+  public static void validateNoPrimitiveReads(Pipeline pipeline) {
+    pipeline.traverseTopologically(new ValidateNoPrimitiveReads());
+  }
+
+  /**
+   * A {@link org.apache.beam.sdk.Pipeline.PipelineVisitor} that ensures that 
the pipeline does not
+   * contain any primitive reads.

Review comment:
       ```suggestion
      * contain any primitive reads when use_deprecated_read is not specified.
   ```

##########
File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
##########
@@ -179,6 +180,11 @@ public DirectPipelineResult run(Pipeline pipeline) {
 
       DisplayDataValidator.validatePipeline(pipeline);
       DisplayDataValidator.validateOptions(options);
+      // TODO(BEAM-10670): Remove the deprecated Read and make the splittable 
DoFn the only option.
+      if (!(ExperimentalOptions.hasExperiment(options, 
"beam_fn_api_use_deprecated_read")

Review comment:
       Is it possible to make `beam_fn_api_use_deprecated_read` and 
`use_deprecated_read` into one `use_deprecated_read` since they seem to the 
same.

##########
File path: 
runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
##########
@@ -167,4 +180,55 @@ public void testBoundednessForUnboundedFn() {
                 "unbounded to unbounded", makeUnboundedCollection(pipeline), 
unboundedFn)
             .isBounded());
   }
+
+  private static class FakeBoundedSource extends BoundedSource<String> {
+    @Override
+    public List<? extends BoundedSource<String>> split(
+        long desiredBundleSizeBytes, PipelineOptions options) throws Exception 
{
+      return Collections.singletonList(this);
+    }
+
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions options) throws 
Exception {
+      return 0;
+    }
+
+    @Override
+    public BoundedReader<String> createReader(PipelineOptions options) throws 
IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Coder<String> getOutputCoder() {
+      return StringUtf8Coder.of();
+    }
+  }
+
+  @Test
+  public void testValidateThatThereAreNoPrimitiveReads() {

Review comment:
       Can we add one block to test using `use_deprecated_read ` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to