[
https://issues.apache.org/jira/browse/BEAM-12021?focusedWorklogId=586826&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-586826
]
ASF GitHub Bot logged work on BEAM-12021:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 21/Apr/21 19:22
Start Date: 21/Apr/21 19:22
Worklog Time Spent: 10m
Work Description: boyuanzz commented on a change in pull request #14276:
URL: https://github.com/apache/beam/pull/14276#discussion_r617775304
##########
File path:
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
##########
@@ -172,11 +172,7 @@ private MultiOutputOverrideFactory(boolean isFnApi) {
verifyFnIsStateful(fn);
DataflowPipelineOptions options =
input.getPipeline().getOptions().as(DataflowPipelineOptions.class);
- DataflowRunner.verifyDoFnSupported(
- fn,
- false,
- DataflowRunner.useUnifiedWorker(options),
Review comment:
Within this change, we can only fail the job with unsupported stateful
dofn after the job has been created.
##########
File path:
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -499,45 +493,34 @@ protected DataflowRunner(DataflowPipelineOptions options)
{
PTransformOverride.of(
PTransformMatchers.writeWithRunnerDeterminedSharding(),
new StreamingShardedWriteFactory(options)));
- if (fnApiEnabled) {
- overridesBuilder.add(
- PTransformOverride.of(
- PTransformMatchers.classEqualTo(Create.Values.class),
- new StreamingFnApiCreateOverrideFactory()));
- }
overridesBuilder.add(
PTransformOverride.of(
PTransformMatchers.groupWithShardableStates(),
new
GroupIntoBatchesOverride.StreamingGroupIntoBatchesWithShardedKeyOverrideFactory(
this)));
- if (!fnApiEnabled) {
- overridesBuilder
- .add(
- // Streaming Bounded Read is implemented in terms of Streaming
Unbounded Read, and
- // must precede it
- PTransformOverride.of(
- PTransformMatchers.classEqualTo(Read.Bounded.class),
- new StreamingBoundedReadOverrideFactory()))
- .add(
- PTransformOverride.of(
- PTransformMatchers.classEqualTo(Read.Unbounded.class),
- new StreamingUnboundedReadOverrideFactory()));
- }
+ overridesBuilder
+ .add(
+ // Streaming Bounded Read is implemented in terms of Streaming
Unbounded Read, and
+ // must precede it
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(Read.Bounded.class),
+ new StreamingBoundedReadOverrideFactory()))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(Read.Unbounded.class),
+ new StreamingUnboundedReadOverrideFactory()));
+
+ overridesBuilder.add(
+ PTransformOverride.of(
+
PTransformMatchers.classEqualTo(View.CreatePCollectionView.class),
+ new StreamingCreatePCollectionViewFactory()));
- if (!fnApiEnabled) {
- overridesBuilder.add(
- PTransformOverride.of(
-
PTransformMatchers.classEqualTo(View.CreatePCollectionView.class),
- new StreamingCreatePCollectionViewFactory()));
- }
// Dataflow Streaming runner overrides the SPLITTABLE_PROCESS_KEYED
transform
// natively in the Dataflow service.
} else {
- if (!fnApiEnabled) {
- overridesBuilder.add(SplittableParDo.PRIMITIVE_BOUNDED_READ_OVERRIDE);
- }
+ overridesBuilder.add(SplittableParDo.PRIMITIVE_BOUNDED_READ_OVERRIDE);
Review comment:
I just noticed that we only swap SDF wrapper out when in batch and we
are using SDF wrapper in streaming on runner_v1 now. Though runner_v1 supports
Splittable DoFn, should we swap the read for streaming as well?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 586826)
Time Spent: 9h (was: 8h 50m)
> PubsubReadIT failures: "Cannot nackAll on persisting checkpoint"
> ----------------------------------------------------------------
>
> Key: BEAM-12021
> URL: https://issues.apache.org/jira/browse/BEAM-12021
> Project: Beam
> Issue Type: Bug
> Components: io-java-gcp, test-failures
> Reporter: Tyson Hamilton
> Assignee: Kenneth Knowles
> Priority: P1
> Labels: currently-failing
> Fix For: Not applicable
>
> Time Spent: 9h
> Remaining Estimate: 0h
>
> *
> [org.apache.beam.sdk.io.gcp.pubsub.PubsubReadIT.testReadPubsubMessageId|https://ci-beam.apache.org/job/beam_PostCommit_Java/7332/testReport/junit/org.apache.beam.sdk.io.gcp.pubsub/PubsubReadIT/testReadPubsubMessageId_2/]
> *
> [org.apache.beam.sdk.io.gcp.pubsub.PubsubReadIT.testReadPublicData|https://ci-beam.apache.org/job/beam_PostCommit_Java/7332/testReport/junit/org.apache.beam.sdk.io.gcp.pubsub/PubsubReadIT/testReadPublicData_2/]
> Job:
> [https://console.cloud.google.com/dataflow/jobs/us-central1/2021-03-17_14_02_29-12611938587996322031?project=apache-beam-testing]
>
> Many worker errors (754) that look like:
> {code:java}
> 2021-03-17 20:47:29.000 PDTError message from worker: generic::unknown:
> java.lang.IllegalStateException: Cannot nackAll on persisting checkpoint
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:507)
>
> org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource$PubsubCheckpoint.nackAll(PubsubUnboundedSource.java:308)
>
> org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource$PubsubSource.createReader(PubsubUnboundedSource.java:1071)
>
> org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource$PubsubSource.createReader(PubsubUnboundedSource.java:1012)
>
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.getProgress(Read.java:963)
>
> org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DefaultGetSize.invokeGetSize(ByteBuddyDoFnInvokerFactory.java:426)
>
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
> Source)
> org.apache.beam.fn.harness.FnApiDoFnRunner.calculateRestrictionSize(FnApiDoFnRunner.java:1182)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner.trySplitForElementAndRestriction(FnApiDoFnRunner.java:1608)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1059)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1000(FnApiDoFnRunner.java:139)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:638)
> org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:633)
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:246)
>
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:200)
>
> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:220)
>
> org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:43)
>
> org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:25)
>
> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:111)
>
> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:308)
>
> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
>
> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> java.lang.Thread.run(Thread.java:748) passed through: ==>
> dist_proc/dax/workflow/worker/fnapi_service.cc:631
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)