This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to 
refs/heads/spark-runner_structured-streaming by this push:
     new 86c5f5c  Add explanation about receiving a Row as input in the combiner
86c5f5c is described below

commit 86c5f5ce2a7856272432f391b0e5f1ed4364901a
Author: Etienne Chauchot <[email protected]>
AuthorDate: Thu Feb 28 09:52:05 2019 +0100

    Add explanation about receiving a Row as input in the combiner
---
 .../translation/batch/AggregatorCombinerGlobally.java                 | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
index 58f48e2..92aeea5 100644
--- 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
+++ 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
@@ -41,7 +41,9 @@ public class AggregatorCombinerGlobally<InputT, AccumT, 
OutputT>
 
   @Override
   public AccumT reduce(AccumT accumulator, InputT input) {
-    // we receive a GenericRowWithSchema from spark containing an InputT
+    // because of generic type InputT, spark cannot infer an input type.
+    // it would pass Integer as input if we had a Aggregator<Input, ..., ...>
+    // without the type inference it stores input in a GenericRowWithSchema
     Row row = (Row) input;
     InputT t = RowHelpers.extractObjectFromRow(row);
     return combineFn.addInput(accumulator, t);

Reply via email to