[
https://issues.apache.org/jira/browse/BEAM-10670?focusedWorklogId=469435&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-469435
]
ASF GitHub Bot logged work on BEAM-10670:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 11/Aug/20 22:12
Start Date: 11/Aug/20 22:12
Worklog Time Spent: 10m
Work Description: boyuanzz commented on a change in pull request #12519:
URL: https://github.com/apache/beam/pull/12519#discussion_r468874809
##########
File path:
runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
##########
@@ -211,10 +211,6 @@ public FinishBundleContext
finishBundleContext(DoFn<InputT, OutputT> doFn) {
KV<RestrictionT, KV<Instant, WatermarkEstimatorStateT>> residual =
processContext.getTakenCheckpoint();
if (cont.shouldResume()) {
- checkState(
Review comment:
Is it because we have `checkDone` now?
##########
File path:
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
##########
@@ -630,4 +635,38 @@ public void tearDown() {
invoker = null;
}
}
+
+ /**
+ * Throws an {@link IllegalArgumentException} if the pipeline contains any
primitive read
+ * transforms that have not been expanded to be executed as {@link DoFn
splittable DoFns}.
+ */
+ public static void validateNoPrimitiveReads(Pipeline pipeline) {
+ pipeline.traverseTopologically(new ValidateNoPrimitiveReads());
+ }
+
+ /**
+ * A {@link org.apache.beam.sdk.Pipeline.PipelineVisitor} that ensures that
the pipeline does not
+ * contain any primitive reads.
Review comment:
```suggestion
* contain any primitive reads when use_deprecated_read is not specified.
```
##########
File path:
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
##########
@@ -179,6 +180,11 @@ public DirectPipelineResult run(Pipeline pipeline) {
DisplayDataValidator.validatePipeline(pipeline);
DisplayDataValidator.validateOptions(options);
+ // TODO(BEAM-10670): Remove the deprecated Read and make the splittable
DoFn the only option.
+ if (!(ExperimentalOptions.hasExperiment(options,
"beam_fn_api_use_deprecated_read")
Review comment:
Is it possible to make `beam_fn_api_use_deprecated_read` and
`use_deprecated_read` into one `use_deprecated_read` since they seem to the
same.
##########
File path:
runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
##########
@@ -167,4 +180,55 @@ public void testBoundednessForUnboundedFn() {
"unbounded to unbounded", makeUnboundedCollection(pipeline),
unboundedFn)
.isBounded());
}
+
+ private static class FakeBoundedSource extends BoundedSource<String> {
+ @Override
+ public List<? extends BoundedSource<String>> split(
+ long desiredBundleSizeBytes, PipelineOptions options) throws Exception
{
+ return Collections.singletonList(this);
+ }
+
+ @Override
+ public long getEstimatedSizeBytes(PipelineOptions options) throws
Exception {
+ return 0;
+ }
+
+ @Override
+ public BoundedReader<String> createReader(PipelineOptions options) throws
IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Coder<String> getOutputCoder() {
+ return StringUtf8Coder.of();
+ }
+ }
+
+ @Test
+ public void testValidateThatThereAreNoPrimitiveReads() {
Review comment:
Can we add one block to test using `use_deprecated_read ` ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 469435)
Time Spent: 1h 10m (was: 1h)
> Make non-portable Splittable DoFn the only option when executing Java "Read"
> transforms
> ---------------------------------------------------------------------------------------
>
> Key: BEAM-10670
> URL: https://issues.apache.org/jira/browse/BEAM-10670
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-core
> Reporter: Luke Cwik
> Assignee: Luke Cwik
> Priority: P2
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> All runners seem to be capable of migrating to splittable DoFn for
> non-portable execution except for Dataflow runner v1 which will internalize
> the current primitive read implementation that is shared across runner
> implementations.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)