boyuanzz commented on a change in pull request #12519:
URL: https://github.com/apache/beam/pull/12519#discussion_r468874809
##########
File path:
runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
##########
@@ -211,10 +211,6 @@ public FinishBundleContext
finishBundleContext(DoFn<InputT, OutputT> doFn) {
KV<RestrictionT, KV<Instant, WatermarkEstimatorStateT>> residual =
processContext.getTakenCheckpoint();
if (cont.shouldResume()) {
- checkState(
Review comment:
Is it because we have `checkDone` now?
##########
File path:
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
##########
@@ -630,4 +635,38 @@ public void tearDown() {
invoker = null;
}
}
+
+ /**
+ * Throws an {@link IllegalArgumentException} if the pipeline contains any
primitive read
+ * transforms that have not been expanded to be executed as {@link DoFn
splittable DoFns}.
+ */
+ public static void validateNoPrimitiveReads(Pipeline pipeline) {
+ pipeline.traverseTopologically(new ValidateNoPrimitiveReads());
+ }
+
+ /**
+ * A {@link org.apache.beam.sdk.Pipeline.PipelineVisitor} that ensures that
the pipeline does not
+ * contain any primitive reads.
Review comment:
```suggestion
* contain any primitive reads when use_deprecated_read is not specified.
```
##########
File path:
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
##########
@@ -179,6 +180,11 @@ public DirectPipelineResult run(Pipeline pipeline) {
DisplayDataValidator.validatePipeline(pipeline);
DisplayDataValidator.validateOptions(options);
+ // TODO(BEAM-10670): Remove the deprecated Read and make the splittable
DoFn the only option.
+ if (!(ExperimentalOptions.hasExperiment(options,
"beam_fn_api_use_deprecated_read")
Review comment:
Is it possible to make `beam_fn_api_use_deprecated_read` and
`use_deprecated_read` into one `use_deprecated_read` since they seem to the
same.
##########
File path:
runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
##########
@@ -167,4 +180,55 @@ public void testBoundednessForUnboundedFn() {
"unbounded to unbounded", makeUnboundedCollection(pipeline),
unboundedFn)
.isBounded());
}
+
+ private static class FakeBoundedSource extends BoundedSource<String> {
+ @Override
+ public List<? extends BoundedSource<String>> split(
+ long desiredBundleSizeBytes, PipelineOptions options) throws Exception
{
+ return Collections.singletonList(this);
+ }
+
+ @Override
+ public long getEstimatedSizeBytes(PipelineOptions options) throws
Exception {
+ return 0;
+ }
+
+ @Override
+ public BoundedReader<String> createReader(PipelineOptions options) throws
IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Coder<String> getOutputCoder() {
+ return StringUtf8Coder.of();
+ }
+ }
+
+ @Test
+ public void testValidateThatThereAreNoPrimitiveReads() {
Review comment:
Can we add one block to test using `use_deprecated_read ` ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]