boyuanzz commented on a change in pull request #14276:
URL: https://github.com/apache/beam/pull/14276#discussion_r617775304
##########
File path:
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
##########
@@ -172,11 +172,7 @@ private MultiOutputOverrideFactory(boolean isFnApi) {
verifyFnIsStateful(fn);
DataflowPipelineOptions options =
input.getPipeline().getOptions().as(DataflowPipelineOptions.class);
- DataflowRunner.verifyDoFnSupported(
- fn,
- false,
- DataflowRunner.useUnifiedWorker(options),
Review comment:
Within this change, we can only fail the job with unsupported stateful
dofn after the job has been created.
##########
File path:
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -499,45 +493,34 @@ protected DataflowRunner(DataflowPipelineOptions options)
{
PTransformOverride.of(
PTransformMatchers.writeWithRunnerDeterminedSharding(),
new StreamingShardedWriteFactory(options)));
- if (fnApiEnabled) {
- overridesBuilder.add(
- PTransformOverride.of(
- PTransformMatchers.classEqualTo(Create.Values.class),
- new StreamingFnApiCreateOverrideFactory()));
- }
overridesBuilder.add(
PTransformOverride.of(
PTransformMatchers.groupWithShardableStates(),
new
GroupIntoBatchesOverride.StreamingGroupIntoBatchesWithShardedKeyOverrideFactory(
this)));
- if (!fnApiEnabled) {
- overridesBuilder
- .add(
- // Streaming Bounded Read is implemented in terms of Streaming
Unbounded Read, and
- // must precede it
- PTransformOverride.of(
- PTransformMatchers.classEqualTo(Read.Bounded.class),
- new StreamingBoundedReadOverrideFactory()))
- .add(
- PTransformOverride.of(
- PTransformMatchers.classEqualTo(Read.Unbounded.class),
- new StreamingUnboundedReadOverrideFactory()));
- }
+ overridesBuilder
+ .add(
+ // Streaming Bounded Read is implemented in terms of Streaming
Unbounded Read, and
+ // must precede it
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(Read.Bounded.class),
+ new StreamingBoundedReadOverrideFactory()))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(Read.Unbounded.class),
+ new StreamingUnboundedReadOverrideFactory()));
+
+ overridesBuilder.add(
+ PTransformOverride.of(
+
PTransformMatchers.classEqualTo(View.CreatePCollectionView.class),
+ new StreamingCreatePCollectionViewFactory()));
- if (!fnApiEnabled) {
- overridesBuilder.add(
- PTransformOverride.of(
-
PTransformMatchers.classEqualTo(View.CreatePCollectionView.class),
- new StreamingCreatePCollectionViewFactory()));
- }
// Dataflow Streaming runner overrides the SPLITTABLE_PROCESS_KEYED
transform
// natively in the Dataflow service.
} else {
- if (!fnApiEnabled) {
- overridesBuilder.add(SplittableParDo.PRIMITIVE_BOUNDED_READ_OVERRIDE);
- }
+ overridesBuilder.add(SplittableParDo.PRIMITIVE_BOUNDED_READ_OVERRIDE);
Review comment:
I just noticed that we only swap SDF wrapper out when in batch and we
are using SDF wrapper in streaming on runner_v1 now. Though runner_v1 supports
Splittable DoFn, should we swap the read for streaming as well?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]