lukecwik commented on a change in pull request #15465:
URL: https://github.com/apache/beam/pull/15465#discussion_r720423147
##########
File path:
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
##########
@@ -329,7 +329,12 @@ public void processElement(Object untypedElem) throws
Exception {
WindowedValue<InputT> elem = (WindowedValue<InputT>) untypedElem;
- if (fnSignature != null && fnSignature.stateDeclarations().size() > 0) {
+ // We use the state-cleanup timer to implementt onWindowExpiration, so
make sure to set it if
Review comment:
```suggestion
// We use the state-cleanup timer to implement onWindowExpiration, so
make sure to set it if
```
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
##########
@@ -431,22 +436,37 @@ private static void finishSpecifyingStateSpecs(
}
}
- private static void validateStateApplicableForInput(DoFn<?, ?> fn,
PCollection<?> input) {
+ private static void validateStateApplicableForInput(
+ DoFn<?, ?> fn, PCollection<?> input, @Nullable FieldAccessDescriptor
fieldAccessDescriptor) {
Coder<?> inputCoder = input.getCoder();
- checkArgument(
- inputCoder instanceof KvCoder,
- "%s requires its input to use %s in order to use state and timers.",
- ParDo.class.getSimpleName(),
- KvCoder.class.getSimpleName());
+ if (fieldAccessDescriptor == null) {
+ checkArgument(
+ inputCoder instanceof KvCoder,
+ "%s requires its input to either use %s or have a schema input in
order to use state and timers.",
Review comment:
Should we mention the `@StateKeyFields` annotation is necessary?
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1695,7 +1699,7 @@ public Object restriction() {
// from timer family declaration.
String timerId =
timerIdOrTimerFamilyId.startsWith(TimerFamilyDeclaration.PREFIX)
- ? ""
+ ? timer.getDynamicTimerTag()
Review comment:
This looks like a bug, please split to separate PR.
##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
##########
@@ -396,6 +396,12 @@ public Duration getAllowedTimestampSkew() {
String value();
}
+ @Documented
Review comment:
javadoc
##########
File path:
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
##########
@@ -142,69 +144,111 @@ private MultiOutputOverrideFactory() {}
}
}
- static class StatefulSingleOutputParDo<K, InputT, OutputT>
- extends PTransform<PCollection<KV<K, InputT>>, PCollection<OutputT>> {
+ static class StatefulSingleOutputParDo<InputT, OutputT>
+ extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
- private final ParDo.SingleOutput<KV<K, InputT>, OutputT> originalParDo;
+ private final ParDo.SingleOutput<InputT, OutputT> originalParDo;
- StatefulSingleOutputParDo(ParDo.SingleOutput<KV<K, InputT>, OutputT>
originalParDo) {
+ StatefulSingleOutputParDo(ParDo.SingleOutput<InputT, OutputT>
originalParDo) {
this.originalParDo = originalParDo;
}
- ParDo.SingleOutput<KV<K, InputT>, OutputT> getOriginalParDo() {
+ ParDo.SingleOutput<InputT, OutputT> getOriginalParDo() {
return originalParDo;
}
@Override
- public PCollection<OutputT> expand(PCollection<KV<K, InputT>> input) {
- DoFn<KV<K, InputT>, OutputT> fn = originalParDo.getFn();
+ @SuppressWarnings({"rawtypes"})
+ public PCollection<OutputT> expand(PCollection<InputT> input) {
+ DoFn fn = originalParDo.getFn();
verifyFnIsStateful(fn);
DataflowPipelineOptions options =
input.getPipeline().getOptions().as(DataflowPipelineOptions.class);
DataflowRunner.verifyDoFnSupported(fn, false,
DataflowRunner.useStreamingEngine(options));
DataflowRunner.verifyStateSupportForWindowingStrategy(input.getWindowingStrategy());
- PTransform<
- PCollection<? extends KV<K, Iterable<KV<Instant,
WindowedValue<KV<K, InputT>>>>>>,
- PCollection<OutputT>>
+ PCollection keyedInput = input;
+ // ParDo does this in ParDo.MultiOutput.expand. However since we're
replacing
+ // ParDo.SingleOutput, the results
+ // of the initial expansion of ParDo.MultiOutput are thrown away, so we
need to add the key
+ // back in.
+ DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
+ @Nullable FieldAccessDescriptor keyFieldAccess =
originalParDo.getKeyFieldsDescriptor();
+ if (keyFieldAccess != null) {
+ if (!input.hasSchema()) {
+ throw new IllegalArgumentException(
+ "Cannot specify a @StateKeyFields if not using a schema");
+ }
+ keyedInput = input.apply("Extract schema keys",
ParDo.getWithSchemaKeys(keyFieldAccess));
+ }
+ return continueExpand(keyedInput, fn);
+ }
+
+ @SuppressWarnings({"rawtypes"})
+ public <K, V> PCollection<OutputT> continueExpand(PCollection<KV<K, V>>
input, DoFn fn) {
+ ParDo.SingleOutput<KV<K, Iterable<KV<Instant, WindowedValue<KV<K,
V>>>>>, OutputT>
statefulParDo =
- ParDo.of(new
BatchStatefulDoFn<>(fn)).withSideInputs(originalParDo.getSideInputs());
+ ParDo.of(new BatchStatefulDoFn<K, V, OutputT>(fn))
+ .withSideInputs(originalParDo.getSideInputs());
return input.apply(new GbkBeforeStatefulParDo<>()).apply(statefulParDo);
}
}
- static class StatefulMultiOutputParDo<K, InputT, OutputT>
- extends PTransform<PCollection<KV<K, InputT>>, PCollectionTuple> {
+ static class StatefulMultiOutputParDo<InputT, OutputT>
+ extends PTransform<PCollection<InputT>, PCollectionTuple> {
- private final ParDo.MultiOutput<KV<K, InputT>, OutputT> originalParDo;
+ private final ParDo.MultiOutput<InputT, OutputT> originalParDo;
- StatefulMultiOutputParDo(ParDo.MultiOutput<KV<K, InputT>, OutputT>
originalParDo) {
+ StatefulMultiOutputParDo(ParDo.MultiOutput<InputT, OutputT> originalParDo)
{
this.originalParDo = originalParDo;
}
@Override
- public PCollectionTuple expand(PCollection<KV<K, InputT>> input) {
- DoFn<KV<K, InputT>, OutputT> fn = originalParDo.getFn();
+ @SuppressWarnings({"rawtypes"})
+ public PCollectionTuple expand(PCollection<InputT> input) {
+ DoFn fn = originalParDo.getFn();
verifyFnIsStateful(fn);
DataflowPipelineOptions options =
input.getPipeline().getOptions().as(DataflowPipelineOptions.class);
DataflowRunner.verifyDoFnSupported(fn, false,
DataflowRunner.useStreamingEngine(options));
DataflowRunner.verifyStateSupportForWindowingStrategy(input.getWindowingStrategy());
- PTransform<
- PCollection<? extends KV<K, Iterable<KV<Instant,
WindowedValue<KV<K, InputT>>>>>>,
- PCollectionTuple>
+ PCollection keyedInput = input;
+ // ParDo does this in ParDo.MultiOutput.expand. However since we're
replacing
+ // ParDo.SingleOutput, the results
+ // of the initial expansion of ParDo.MultiOutput are thrown away, so we
need to add the key
+ // back Parin.
Review comment:
```suggestion
// back in.
```
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
##########
@@ -1015,12 +1303,129 @@ public TupleTagList getAdditionalOutputTags() {
return PCollectionViews.toAdditionalInputs(sideInputs.values());
}
+ public FieldAccessDescriptor getKeyFieldsDescriptor() {
+ return keyFieldsDescriptor;
+ }
+
+ @Internal
+ public DoFnSchemaInformation getDoFnSchemaInformation() {
+ return doFnSchemaInformation;
+ }
+
@Override
public String toString() {
return fn.toString();
}
}
+ public static <T> PTransform<PCollection<T>, PCollection<KV<Row, T>>>
getWithSchemaKeys(
+ FieldAccessDescriptor fieldAccessDescriptor) {
+ return new SchemaToKv<>(fieldAccessDescriptor);
+ }
+
+ private static class SchemaToKv<T> extends PTransform<PCollection<T>,
PCollection<KV<Row, T>>> {
+ private RowSelector rowSelector;
+ private final FieldAccessDescriptor fieldAccessDescriptor;
+
+ SchemaToKv(FieldAccessDescriptor fieldAccessDescriptor) {
+ this.fieldAccessDescriptor = fieldAccessDescriptor;
+ }
+
+ @Override
+ public PCollection<KV<Row, T>> expand(PCollection<T> input) {
+ Schema schema = input.getSchema();
+ TypeDescriptor<T> typeDescriptor = input.getTypeDescriptor();
+ SerializableFunction<T, Row> toRowFunction = input.getToRowFunction();
+ SerializableFunction<Row, T> fromRowFunction =
input.getFromRowFunction();
+
+ FieldAccessDescriptor resolved = fieldAccessDescriptor.resolve(schema);
+ rowSelector = new RowSelectorContainer(schema, resolved, true);
+ Schema keySchema = SelectHelpers.getOutputSchema(schema, resolved);
+
+ return input
+ .apply(
+ "selectKeys",
+ ParDo.of(
+ new DoFn<T, KV<Row, T>>() {
+ @ProcessElement
+ public void process(
+ @Element Row row, // Beam will convert the element to
a row.
+ @Element T element, // Beam will return the original
element.
+ OutputReceiver<KV<Row, T>> o) {
+ o.output(KV.of(rowSelector.select(row), element));
+ }
+ }))
+ .setCoder(
+ KvCoder.of(
+ SchemaCoder.of(keySchema),
+ SchemaCoder.of(schema, typeDescriptor, toRowFunction,
fromRowFunction)));
+ }
+ }
+
+ public static class MultiOutputSchemaKeyFields<InputT, DoFnInputT, OutputT>
Review comment:
Would it make sense to make this a subclass of MultiOutput?
I would think that this would allow you to re-use the existing translation
logic and only update "which" transforms are translated.
##########
File path:
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoSchemaTest.java
##########
@@ -681,30 +696,349 @@ public void processElement(
}
};
+ TupleTag<Row> mainTag = new TupleTag<>();
PCollection<Row> output =
pipeline
.apply(
+ "Create values",
Create.of(
- KV.of("hello", Row.withSchema(type).addValue("a").build()),
- KV.of("hello", Row.withSchema(type).addValue("b").build()),
- KV.of("hello", Row.withSchema(type).addValue("c").build()),
- KV.of("hello",
Row.withSchema(type).addValue("d").build())))
- .apply(ParDo.of(fn))
+ Row.withSchema(type).addValues("a", "hello").build(),
+ Row.withSchema(type).addValues("b", "hello").build(),
+ Row.withSchema(type).addValues("c", "hello").build(),
+ Row.withSchema(type).addValues("d", "hello").build())
+ .withRowSchema(type))
+ .apply("run statetful fn", ParDo.of(fn).withOutputTags(mainTag,
TupleTagList.empty()))
Review comment:
```suggestion
.apply("Run stateful fn", ParDo.of(fn).withOutputTags(mainTag,
TupleTagList.empty()))
```
##########
File path:
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
##########
@@ -224,10 +268,11 @@ public PCollectionTuple expand(PCollection<KV<K, InputT>>
input) {
// is not registered by default, so we explicitly set the relevant
coders.
checkState(
input.getCoder() instanceof KvCoder,
- "Input to a %s using state requires a %s, but the coder was %s",
+ "Input to a %s using state requires a %s, but the coder was %s.
PColleciton %s",
Review comment:
```suggestion
"Input to a %s using state requires a %s, but the coder was %s.
PCollection %s",
```
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -2449,7 +2467,13 @@ public void output(OutputT output) {
@Override
public InputT element() {
- return currentElement.getValue();
+ if (doFnSchemaInformation.getKeyFieldsDescriptor() != null) {
Review comment:
Is there a cleaner way to do this without imposing the cost of the `if`
on all non schema specific DoFns?
I was thinking that we could create another ProcessBundleContext base class
that has this override and similar variants for all the child classes and then
choose the appropriate variant when the FnApiDoFnRunner is being initialized.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]