echauchot commented on code in PR #24543:
URL: https://github.com/apache/beam/pull/24543#discussion_r1050531362
##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java:
##########
@@ -115,40 +108,27 @@ public boolean canTranslate(ParDo.MultiOutput<InputT,
OutputT> transform) {
@Override
public void translate(ParDo.MultiOutput<InputT, OutputT> transform, Context
cxt)
throws IOException {
- String stepName = cxt.getCurrentTransform().getFullName();
-
- TupleTag<OutputT> mainOutputTag = transform.getMainOutputTag();
-
- DoFnSchemaInformation doFnSchema =
- ParDoTranslation.getSchemaInformation(cxt.getCurrentTransform());
PCollection<InputT> input = (PCollection<InputT>) cxt.getInput();
- Map<String, PCollectionView<?>> sideInputs = transform.getSideInputs();
Map<TupleTag<?>, PCollection<?>> outputs = cxt.getOutputs();
- DoFnMapPartitionsFactory<InputT, OutputT> factory =
Review Comment:
I like this sub-types per single output and multi output + the way you
managed column indexes.
##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnMapPartitionsFactory.java:
##########
@@ -17,91 +17,171 @@
*/
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
-import static java.util.stream.Collectors.toCollection;
-import static java.util.stream.Collectors.toMap;
import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.scalaIterator;
-import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists.newArrayListWithCapacity;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.tuple;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import java.io.Serializable;
import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.runners.core.construction.ParDoTranslation;
import
org.apache.beam.runners.spark.structuredstreaming.metrics.MetricsAccumulator;
import
org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.CachedSideInputReader;
import
org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.NoOpStepContext;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.Fun1;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.Fun2;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator;
-import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.checkerframework.checker.nullness.qual.NonNull;
+import scala.Function1;
+import scala.Tuple2;
import scala.collection.Iterator;
/**
- * Encapsulates a {@link DoFn} inside a Spark {@link
- * org.apache.spark.api.java.function.MapPartitionsFunction}.
+ * Abstract factory to create a {@link DoFnPartitionIt DoFn partition
iterator} using a customizable
+ * {@link DoFnRunners.OutputManager}.
*/
-class DoFnMapPartitionsFactory<InT, OutT> implements Serializable {
+abstract class DoFnMapPartitionsFactory<InT, FnOutT, OutT extends @NonNull
Object>
+ implements Function1<Iterator<WindowedValue<InT>>, Iterator<OutT>>,
Serializable {
private final String stepName;
-
- private final DoFn<InT, OutT> doFn;
+ private final DoFn<? super InT, FnOutT> doFn;
Review Comment:
`? super IntT` is only to comply with the return type of
`AppliedPTransform#getTransform()` now that you get the DoFn from the
AppliedPTransform right ?
##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnMapPartitionsFactory.java:
##########
@@ -136,15 +211,15 @@
}
@Override
- protected OutputT computeNext() {
+ protected OutT computeNext() {
Review Comment:
it was already clear but now if a user asks the difference between a Beam
bundle and a spark partition, I could answer "none" and point to this code that
invokes startBundle/finishBundle with each spark partition.
##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnMapPartitionsFactory.java:
##########
@@ -17,91 +17,171 @@
*/
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
-import static java.util.stream.Collectors.toCollection;
-import static java.util.stream.Collectors.toMap;
import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.scalaIterator;
-import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists.newArrayListWithCapacity;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.tuple;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import java.io.Serializable;
import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.runners.core.construction.ParDoTranslation;
import
org.apache.beam.runners.spark.structuredstreaming.metrics.MetricsAccumulator;
import
org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.CachedSideInputReader;
import
org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.NoOpStepContext;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.Fun1;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.Fun2;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator;
-import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.checkerframework.checker.nullness.qual.NonNull;
+import scala.Function1;
+import scala.Tuple2;
import scala.collection.Iterator;
/**
- * Encapsulates a {@link DoFn} inside a Spark {@link
- * org.apache.spark.api.java.function.MapPartitionsFunction}.
+ * Abstract factory to create a {@link DoFnPartitionIt DoFn partition
iterator} using a customizable
+ * {@link DoFnRunners.OutputManager}.
*/
-class DoFnMapPartitionsFactory<InT, OutT> implements Serializable {
+abstract class DoFnMapPartitionsFactory<InT, FnOutT, OutT extends @NonNull
Object>
Review Comment:
what is the aim of adding `extends Object to OutT ?`
##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnMapPartitionsFactory.java:
##########
@@ -17,91 +17,171 @@
*/
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
-import static java.util.stream.Collectors.toCollection;
-import static java.util.stream.Collectors.toMap;
import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.scalaIterator;
-import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists.newArrayListWithCapacity;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.tuple;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import java.io.Serializable;
import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.runners.core.construction.ParDoTranslation;
import
org.apache.beam.runners.spark.structuredstreaming.metrics.MetricsAccumulator;
import
org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.CachedSideInputReader;
import
org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.NoOpStepContext;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.Fun1;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.Fun2;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator;
-import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.checkerframework.checker.nullness.qual.NonNull;
+import scala.Function1;
+import scala.Tuple2;
import scala.collection.Iterator;
/**
- * Encapsulates a {@link DoFn} inside a Spark {@link
- * org.apache.spark.api.java.function.MapPartitionsFunction}.
+ * Abstract factory to create a {@link DoFnPartitionIt DoFn partition
iterator} using a customizable
+ * {@link DoFnRunners.OutputManager}.
*/
-class DoFnMapPartitionsFactory<InT, OutT> implements Serializable {
+abstract class DoFnMapPartitionsFactory<InT, FnOutT, OutT extends @NonNull
Object>
+ implements Function1<Iterator<WindowedValue<InT>>, Iterator<OutT>>,
Serializable {
Review Comment:
instead of enclosing the Fun1 you put it one level up by implementing it.
One more reduce of the closure !
##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnMapPartitionsFactory.java:
##########
@@ -17,91 +17,171 @@
*/
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
-import static java.util.stream.Collectors.toCollection;
-import static java.util.stream.Collectors.toMap;
import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.scalaIterator;
-import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists.newArrayListWithCapacity;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.tuple;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import java.io.Serializable;
import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.runners.core.construction.ParDoTranslation;
import
org.apache.beam.runners.spark.structuredstreaming.metrics.MetricsAccumulator;
import
org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.CachedSideInputReader;
import
org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.NoOpStepContext;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.Fun1;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.Fun2;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator;
-import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.checkerframework.checker.nullness.qual.NonNull;
+import scala.Function1;
+import scala.Tuple2;
import scala.collection.Iterator;
/**
- * Encapsulates a {@link DoFn} inside a Spark {@link
- * org.apache.spark.api.java.function.MapPartitionsFunction}.
+ * Abstract factory to create a {@link DoFnPartitionIt DoFn partition
iterator} using a customizable
Review Comment:
I know it is not your naming but please rename to DoFnPartitionIterator for
clarity
##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnMapPartitionsFactory.java:
##########
@@ -17,91 +17,171 @@
*/
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
-import static java.util.stream.Collectors.toCollection;
-import static java.util.stream.Collectors.toMap;
import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.scalaIterator;
-import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists.newArrayListWithCapacity;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.tuple;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import java.io.Serializable;
import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.runners.core.construction.ParDoTranslation;
import
org.apache.beam.runners.spark.structuredstreaming.metrics.MetricsAccumulator;
import
org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.CachedSideInputReader;
import
org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.NoOpStepContext;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.Fun1;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.Fun2;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator;
-import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.checkerframework.checker.nullness.qual.NonNull;
+import scala.Function1;
+import scala.Tuple2;
import scala.collection.Iterator;
/**
- * Encapsulates a {@link DoFn} inside a Spark {@link
- * org.apache.spark.api.java.function.MapPartitionsFunction}.
+ * Abstract factory to create a {@link DoFnPartitionIt DoFn partition
iterator} using a customizable
+ * {@link DoFnRunners.OutputManager}.
*/
-class DoFnMapPartitionsFactory<InT, OutT> implements Serializable {
+abstract class DoFnMapPartitionsFactory<InT, FnOutT, OutT extends @NonNull
Object>
+ implements Function1<Iterator<WindowedValue<InT>>, Iterator<OutT>>,
Serializable {
private final String stepName;
-
- private final DoFn<InT, OutT> doFn;
+ private final DoFn<? super InT, FnOutT> doFn;
Review Comment:
OutT and FnOutT can be different ?
##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnMapPartitionsFactory.java:
##########
@@ -17,91 +17,171 @@
*/
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
-import static java.util.stream.Collectors.toCollection;
-import static java.util.stream.Collectors.toMap;
import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.scalaIterator;
-import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists.newArrayListWithCapacity;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.tuple;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import java.io.Serializable;
import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.runners.core.construction.ParDoTranslation;
import
org.apache.beam.runners.spark.structuredstreaming.metrics.MetricsAccumulator;
import
org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.CachedSideInputReader;
import
org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.NoOpStepContext;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.Fun1;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.Fun2;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator;
-import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.checkerframework.checker.nullness.qual.NonNull;
+import scala.Function1;
+import scala.Tuple2;
import scala.collection.Iterator;
/**
- * Encapsulates a {@link DoFn} inside a Spark {@link
- * org.apache.spark.api.java.function.MapPartitionsFunction}.
+ * Abstract factory to create a {@link DoFnPartitionIt DoFn partition
iterator} using a customizable
+ * {@link DoFnRunners.OutputManager}.
*/
-class DoFnMapPartitionsFactory<InT, OutT> implements Serializable {
+abstract class DoFnMapPartitionsFactory<InT, FnOutT, OutT extends @NonNull
Object>
+ implements Function1<Iterator<WindowedValue<InT>>, Iterator<OutT>>,
Serializable {
private final String stepName;
-
- private final DoFn<InT, OutT> doFn;
+ private final DoFn<? super InT, FnOutT> doFn;
private final DoFnSchemaInformation doFnSchema;
private final Supplier<PipelineOptions> options;
-
private final Coder<InT> coder;
private final WindowingStrategy<?, ?> windowingStrategy;
- private final TupleTag<OutT> mainOutput;
+ private final TupleTag<FnOutT> mainOutput;
private final List<TupleTag<?>> additionalOutputs;
private final Map<TupleTag<?>, Coder<?>> outputCoders;
-
private final Map<String, PCollectionView<?>> sideInputs;
private final SideInputReader sideInputReader;
- DoFnMapPartitionsFactory(
- String stepName,
- DoFn<InT, OutT> doFn,
- DoFnSchemaInformation doFnSchema,
+ private DoFnMapPartitionsFactory(
+ AppliedPTransform<PCollection<? extends InT>, ?, MultiOutput<InT,
FnOutT>> appliedPT,
Review Comment:
closure reduced a lot !
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]