reuvenlax commented on a change in pull request #15465:
URL: https://github.com/apache/beam/pull/15465#discussion_r726681098



##########
File path: 
runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EnvironmentsTest.java
##########
@@ -249,7 +238,7 @@ public void process(ProcessContext ctxt) {}
     Environment env1 = Environments.getEnvironment(ptransform, 
rehydratedComponents).get();
     assertThat(
         env1,
-        
equalTo(components.toComponents().getEnvironmentsOrThrow(ptransform.getEnvironmentId())));
+        
equalTo(components.toComponents().getEnvironmentsOrThrow(ptransform.getEnvironmentId())));*/

Review comment:
       No - fixed

##########
File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
##########
@@ -100,7 +101,8 @@
               windowingStrategy,
               doFnSchemaInformation,
               sideInputMapping);
-      if (DoFnSignatures.signatureForDoFn(fn).usesState()) {
+      DoFnSignature signature = DoFnSignatures.signatureForDoFn(fn);
+      if (signature.usesState() || signature.onWindowExpiration() != null) {

Review comment:
       @lukecwik would you prefer this be in a separate PR? it arguably fixes a 
separable bug.

##########
File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
##########
@@ -142,69 +144,111 @@ private MultiOutputOverrideFactory() {}
     }
   }
 
-  static class StatefulSingleOutputParDo<K, InputT, OutputT>
-      extends PTransform<PCollection<KV<K, InputT>>, PCollection<OutputT>> {
+  static class StatefulSingleOutputParDo<InputT, OutputT>
+      extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
 
-    private final ParDo.SingleOutput<KV<K, InputT>, OutputT> originalParDo;
+    private final ParDo.SingleOutput<InputT, OutputT> originalParDo;
 
-    StatefulSingleOutputParDo(ParDo.SingleOutput<KV<K, InputT>, OutputT> 
originalParDo) {
+    StatefulSingleOutputParDo(ParDo.SingleOutput<InputT, OutputT> 
originalParDo) {
       this.originalParDo = originalParDo;
     }
 
-    ParDo.SingleOutput<KV<K, InputT>, OutputT> getOriginalParDo() {
+    ParDo.SingleOutput<InputT, OutputT> getOriginalParDo() {
       return originalParDo;
     }
 
     @Override
-    public PCollection<OutputT> expand(PCollection<KV<K, InputT>> input) {
-      DoFn<KV<K, InputT>, OutputT> fn = originalParDo.getFn();
+    @SuppressWarnings({"rawtypes"})
+    public PCollection<OutputT> expand(PCollection<InputT> input) {
+      DoFn fn = originalParDo.getFn();
       verifyFnIsStateful(fn);
       DataflowPipelineOptions options =
           input.getPipeline().getOptions().as(DataflowPipelineOptions.class);
       DataflowRunner.verifyDoFnSupported(fn, false, 
DataflowRunner.useStreamingEngine(options));
       
DataflowRunner.verifyStateSupportForWindowingStrategy(input.getWindowingStrategy());
 
-      PTransform<
-              PCollection<? extends KV<K, Iterable<KV<Instant, 
WindowedValue<KV<K, InputT>>>>>>,
-              PCollection<OutputT>>
+      PCollection keyedInput = input;
+      // ParDo does this in ParDo.MultiOutput.expand. However since we're 
replacing
+      // ParDo.SingleOutput, the results
+      // of the initial expansion of ParDo.MultiOutput are thrown away, so we 
need to add the key
+      // back in.
+      DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
+      @Nullable FieldAccessDescriptor keyFieldAccess = 
originalParDo.getKeyFieldsDescriptor();
+      if (keyFieldAccess != null) {
+        if (!input.hasSchema()) {
+          throw new IllegalArgumentException(
+              "Cannot specify a @StateKeyFields if not using a schema");
+        }
+        keyedInput = input.apply("Extract schema keys", 
ParDo.getWithSchemaKeys(keyFieldAccess));
+      }
+      return continueExpand(keyedInput, fn);
+    }
+
+    @SuppressWarnings({"rawtypes"})
+    public <K, V> PCollection<OutputT> continueExpand(PCollection<KV<K, V>> 
input, DoFn fn) {
+      ParDo.SingleOutput<KV<K, Iterable<KV<Instant, WindowedValue<KV<K, 
V>>>>>, OutputT>
           statefulParDo =
-              ParDo.of(new 
BatchStatefulDoFn<>(fn)).withSideInputs(originalParDo.getSideInputs());
+              ParDo.of(new BatchStatefulDoFn<K, V, OutputT>(fn))
+                  .withSideInputs(originalParDo.getSideInputs());
 
       return input.apply(new GbkBeforeStatefulParDo<>()).apply(statefulParDo);
     }
   }
 
-  static class StatefulMultiOutputParDo<K, InputT, OutputT>
-      extends PTransform<PCollection<KV<K, InputT>>, PCollectionTuple> {
+  static class StatefulMultiOutputParDo<InputT, OutputT>
+      extends PTransform<PCollection<InputT>, PCollectionTuple> {
 
-    private final ParDo.MultiOutput<KV<K, InputT>, OutputT> originalParDo;
+    private final ParDo.MultiOutput<InputT, OutputT> originalParDo;
 
-    StatefulMultiOutputParDo(ParDo.MultiOutput<KV<K, InputT>, OutputT> 
originalParDo) {
+    StatefulMultiOutputParDo(ParDo.MultiOutput<InputT, OutputT> originalParDo) 
{
       this.originalParDo = originalParDo;
     }
 
     @Override
-    public PCollectionTuple expand(PCollection<KV<K, InputT>> input) {
-      DoFn<KV<K, InputT>, OutputT> fn = originalParDo.getFn();
+    @SuppressWarnings({"rawtypes"})
+    public PCollectionTuple expand(PCollection<InputT> input) {
+      DoFn fn = originalParDo.getFn();
       verifyFnIsStateful(fn);
       DataflowPipelineOptions options =
           input.getPipeline().getOptions().as(DataflowPipelineOptions.class);
       DataflowRunner.verifyDoFnSupported(fn, false, 
DataflowRunner.useStreamingEngine(options));
       
DataflowRunner.verifyStateSupportForWindowingStrategy(input.getWindowingStrategy());
 
-      PTransform<
-              PCollection<? extends KV<K, Iterable<KV<Instant, 
WindowedValue<KV<K, InputT>>>>>>,
-              PCollectionTuple>
+      PCollection keyedInput = input;
+      // ParDo does this in ParDo.MultiOutput.expand. However since we're 
replacing
+      // ParDo.SingleOutput, the results
+      // of the initial expansion of ParDo.MultiOutput are thrown away, so we 
need to add the key
+      // back Parin.

Review comment:
       done

##########
File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
##########
@@ -224,10 +268,11 @@ public PCollectionTuple expand(PCollection<KV<K, InputT>> 
input) {
       // is not registered by default, so we explicitly set the relevant 
coders.
       checkState(
           input.getCoder() instanceof KvCoder,
-          "Input to a %s using state requires a %s, but the coder was %s",
+          "Input to a %s using state requires a %s, but the coder was %s. 
PColleciton %s",

Review comment:
       done

##########
File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
##########
@@ -329,7 +329,12 @@ public void processElement(Object untypedElem) throws 
Exception {
 
     WindowedValue<InputT> elem = (WindowedValue<InputT>) untypedElem;
 
-    if (fnSignature != null && fnSignature.stateDeclarations().size() > 0) {
+    // We use the state-cleanup timer to implementt onWindowExpiration, so 
make sure to set it if

Review comment:
       done

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
##########
@@ -396,6 +396,12 @@ public Duration getAllowedTimestampSkew() {
     String value();
   }
 
+  @Documented

Review comment:
       done

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -2449,7 +2467,13 @@ public void output(OutputT output) {
 
     @Override
     public InputT element() {
-      return currentElement.getValue();
+      if (doFnSchemaInformation.getKeyFieldsDescriptor() != null) {

Review comment:
       Yeah, that makes sense and similar to what we do with 
WindowObservingProcessBundleContext. Do you think it also makes sense to 
memoize things? (e.g. the schema key accessor currently parses the key out of 
the record every time it is called)

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
##########
@@ -1015,12 +1303,129 @@ public TupleTagList getAdditionalOutputTags() {
       return PCollectionViews.toAdditionalInputs(sideInputs.values());
     }
 
+    public FieldAccessDescriptor getKeyFieldsDescriptor() {
+      return keyFieldsDescriptor;
+    }
+
+    @Internal
+    public DoFnSchemaInformation getDoFnSchemaInformation() {
+      return doFnSchemaInformation;
+    }
+
     @Override
     public String toString() {
       return fn.toString();
     }
   }
 
+  public static <T> PTransform<PCollection<T>, PCollection<KV<Row, T>>> 
getWithSchemaKeys(
+      FieldAccessDescriptor fieldAccessDescriptor) {
+    return new SchemaToKv<>(fieldAccessDescriptor);
+  }
+
+  private static class SchemaToKv<T> extends PTransform<PCollection<T>, 
PCollection<KV<Row, T>>> {
+    private RowSelector rowSelector;
+    private final FieldAccessDescriptor fieldAccessDescriptor;
+
+    SchemaToKv(FieldAccessDescriptor fieldAccessDescriptor) {
+      this.fieldAccessDescriptor = fieldAccessDescriptor;
+    }
+
+    @Override
+    public PCollection<KV<Row, T>> expand(PCollection<T> input) {
+      Schema schema = input.getSchema();
+      TypeDescriptor<T> typeDescriptor = input.getTypeDescriptor();
+      SerializableFunction<T, Row> toRowFunction = input.getToRowFunction();
+      SerializableFunction<Row, T> fromRowFunction = 
input.getFromRowFunction();
+
+      FieldAccessDescriptor resolved = fieldAccessDescriptor.resolve(schema);
+      rowSelector = new RowSelectorContainer(schema, resolved, true);
+      Schema keySchema = SelectHelpers.getOutputSchema(schema, resolved);
+
+      return input
+          .apply(
+              "selectKeys",
+              ParDo.of(
+                  new DoFn<T, KV<Row, T>>() {
+                    @ProcessElement
+                    public void process(
+                        @Element Row row, // Beam will convert the element to 
a row.
+                        @Element T element, // Beam will return the original 
element.
+                        OutputReceiver<KV<Row, T>> o) {
+                      o.output(KV.of(rowSelector.select(row), element));
+                    }
+                  }))
+          .setCoder(
+              KvCoder.of(
+                  SchemaCoder.of(keySchema),
+                  SchemaCoder.of(schema, typeDescriptor, toRowFunction, 
fromRowFunction)));
+    }
+  }
+
+  public static class MultiOutputSchemaKeyFields<InputT, DoFnInputT, OutputT>

Review comment:
       this would help somewhat, but not completely. we would still need to 
register separate transform translators, as those are based off of the class 
itself, not instanceof.
   
   This would simply translation logic a bit, but it comes with a typing 
challenge. MultiOutput today assumes that the DoFn input type is the same as 
the PCollection input type. In MultiOutputSchemaKeyFields this is not the case 
- the PCollection input is a KV while the DoFn input is the original type. 
Currently this blocks me making one.a subclass of the other unless I do some 
strange finagling of types

##########
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoSchemaTest.java
##########
@@ -681,30 +696,349 @@ public void processElement(
           }
         };
 
+    TupleTag<Row> mainTag = new TupleTag<>();
     PCollection<Row> output =
         pipeline
             .apply(
+                "Create values",
                 Create.of(
-                    KV.of("hello", Row.withSchema(type).addValue("a").build()),
-                    KV.of("hello", Row.withSchema(type).addValue("b").build()),
-                    KV.of("hello", Row.withSchema(type).addValue("c").build()),
-                    KV.of("hello", 
Row.withSchema(type).addValue("d").build())))
-            .apply(ParDo.of(fn))
+                        Row.withSchema(type).addValues("a", "hello").build(),
+                        Row.withSchema(type).addValues("b", "hello").build(),
+                        Row.withSchema(type).addValues("c", "hello").build(),
+                        Row.withSchema(type).addValues("d", "hello").build())
+                    .withRowSchema(type))
+            .apply("run statetful fn", ParDo.of(fn).withOutputTags(mainTag, 
TupleTagList.empty()))

Review comment:
       done

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
##########
@@ -431,22 +436,37 @@ private static void finishSpecifyingStateSpecs(
     }
   }
 
-  private static void validateStateApplicableForInput(DoFn<?, ?> fn, 
PCollection<?> input) {
+  private static void validateStateApplicableForInput(
+      DoFn<?, ?> fn, PCollection<?> input, @Nullable FieldAccessDescriptor 
fieldAccessDescriptor) {
     Coder<?> inputCoder = input.getCoder();
-    checkArgument(
-        inputCoder instanceof KvCoder,
-        "%s requires its input to use %s in order to use state and timers.",
-        ParDo.class.getSimpleName(),
-        KvCoder.class.getSimpleName());
+    if (fieldAccessDescriptor == null) {
+      checkArgument(
+          inputCoder instanceof KvCoder,
+          "%s requires its input to either use %s or have a schema input in 
order to use state and timers.",

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to