taegeonum commented on a change in pull request #150: [NEMO-269] Direct
translation from Beam DAG to Nemo DAG
URL: https://github.com/apache/incubator-nemo/pull/150#discussion_r231004276
##########
File path:
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
##########
@@ -258,429 +237,139 @@ private static void windowTranslator(final
TranslationContext ctx,
}
final IRVertex vertex = new OperatorVertex(new
WindowFnTransform(windowFn));
ctx.addVertex(vertex);
- transformVertex.getNode().getInputs().values().forEach(input ->
ctx.addEdgeTo(vertex, input));
- transformVertex.getNode().getOutputs().values().forEach(output ->
ctx.registerMainOutputFrom(vertex, output));
+ beamNode.getInputs().values().forEach(input -> ctx.addEdgeTo(vertex,
input));
+ beamNode.getOutputs().values().forEach(output ->
ctx.registerMainOutputFrom(beamNode, vertex, output));
}
@PrimitiveTransformTranslator(View.CreatePCollectionView.class)
- private static void createPCollectionViewTranslator(final TranslationContext
ctx,
- final
PrimitiveTransformVertex transformVertex,
+ private static void createPCollectionViewTranslator(final
PipelineTranslationContext ctx,
+ final
TransformHierarchy.Node beamNode,
final
View.CreatePCollectionView<?, ?> transform) {
final IRVertex vertex = new OperatorVertex(new
CreateViewTransform(transform.getView().getViewFn()));
ctx.addVertex(vertex);
- transformVertex.getNode().getInputs().values().forEach(input ->
ctx.addEdgeTo(vertex, input));
- ctx.registerMainOutputFrom(vertex, transform.getView());
- transformVertex.getNode().getOutputs().values().forEach(output ->
ctx.registerMainOutputFrom(vertex, output));
+ beamNode.getInputs().values().forEach(input -> ctx.addEdgeTo(vertex,
input));
+ ctx.registerMainOutputFrom(beamNode, vertex, transform.getView());
+ beamNode.getOutputs().values().forEach(output ->
ctx.registerMainOutputFrom(beamNode, vertex, output));
}
@PrimitiveTransformTranslator(Flatten.PCollections.class)
- private static void flattenTranslator(final TranslationContext ctx,
- final PrimitiveTransformVertex
transformVertex,
+ private static void flattenTranslator(final PipelineTranslationContext ctx,
+ final TransformHierarchy.Node beamNode,
final Flatten.PCollections<?>
transform) {
final IRVertex vertex = new OperatorVertex(new FlattenTransform());
ctx.addVertex(vertex);
- transformVertex.getNode().getInputs().values().forEach(input ->
ctx.addEdgeTo(vertex, input));
- transformVertex.getNode().getOutputs().values().forEach(output ->
ctx.registerMainOutputFrom(vertex, output));
+ beamNode.getInputs().values().forEach(input -> ctx.addEdgeTo(vertex,
input));
+ beamNode.getOutputs().values().forEach(output ->
ctx.registerMainOutputFrom(beamNode, vertex, output));
}
- /**
- * Default translator for CompositeTransforms. Translates inner DAG without
modifying {@link TranslationContext}.
- *
- * @param ctx provides translation context
- * @param transformVertex the given CompositeTransform to translate
- * @param transform transform which can be obtained from {@code
transformVertex}
- */
- @CompositeTransformTranslator(PTransform.class)
- private static void topologicalTranslator(final TranslationContext ctx,
- final CompositeTransformVertex
transformVertex,
- final PTransform<?, ?> transform) {
- transformVertex.getDAG().topologicalDo(ctx::translate);
- }
+
////////////////////////////////////////////////////////////////////////////////////////////////////////
+ /////////////////////// COMPOSITE TRANSFORMS
/**
- * Translator for Combine transform. Implements local combining before
shuffling key-value pairs.
+ * {@link Combine.PerKey} = {@link GroupByKey} + {@link
Combine.GroupedValues}
+ * ({@link Combine.Globally} internally uses {@link Combine.PerKey} which
will also be optimized by this translator)
+ * Here, we translate this composite transform as a whole, exploiting its
accumulator semantics.
*
* @param ctx provides translation context
- * @param transformVertex the given CompositeTransform to translate
- * @param transform transform which can be obtained from {@code
transformVertex}
+ * @param beamNode the given CompositeTransform to translate
+ * @param transform transform which can be obtained from {@code beamNode}
*/
- @CompositeTransformTranslator({Combine.Globally.class, Combine.PerKey.class,
Combine.GroupedValues.class})
- private static void combineTranslator(final TranslationContext ctx,
- final CompositeTransformVertex
transformVertex,
- final PTransform<?, ?> transform) {
- // No optimization for BeamSQL that handles Beam 'Row's.
- final boolean handlesBeamRow = Stream
- .concat(transformVertex.getNode().getInputs().values().stream(),
- transformVertex.getNode().getOutputs().values().stream())
- .map(pValue -> (KvCoder) getCoder(pValue, ctx.root)) // Input and output
of combine should be KV
- .map(kvCoder -> kvCoder.getValueCoder().getEncodedTypeDescriptor()) //
We're interested in the 'Value' of KV
- .anyMatch(valueTypeDescriptor ->
TypeDescriptor.of(Row.class).equals(valueTypeDescriptor));
- if (handlesBeamRow) {
- transformVertex.getDAG().topologicalDo(ctx::translate);
- return; // return early and give up optimization - TODO #209: Enable
Local Combiner for BeamSQL
- }
-
- // Local combiner optimization
- final List<TransformVertex> topologicalOrdering =
transformVertex.getDAG().getTopologicalSort();
- final TransformVertex groupByKeyBeamTransform = topologicalOrdering.get(0);
- final TransformVertex last =
topologicalOrdering.get(topologicalOrdering.size() - 1);
- if (groupByKeyBeamTransform.getNode().getTransform() instanceof
GroupByKey) {
- // Translate the given CompositeTransform under OneToOneEdge-enforced
context.
- final TranslationContext oneToOneEdgeContext = new
TranslationContext(ctx,
- OneToOneCommunicationPatternSelector.INSTANCE);
- transformVertex.getDAG().topologicalDo(oneToOneEdgeContext::translate);
-
- // Attempt to translate the CompositeTransform again.
- // Add GroupByKey, which is the first transform in the given
CompositeTransform.
- // Make sure it consumes the output from the last vertex in
OneToOneEdge-translated hierarchy.
- final IRVertex groupByKeyIRVertex = new
OperatorVertex(createGBKTransform(ctx, transformVertex));
- ctx.addVertex(groupByKeyIRVertex);
- last.getNode().getOutputs().values().forEach(outputFromCombiner
- -> ctx.addEdgeTo(groupByKeyIRVertex, outputFromCombiner));
- groupByKeyBeamTransform.getNode().getOutputs().values()
- .forEach(outputFromGroupByKey ->
ctx.registerMainOutputFrom(groupByKeyIRVertex, outputFromGroupByKey));
-
- // Translate the remaining vertices.
- topologicalOrdering.stream().skip(1).forEach(ctx::translate);
- } else {
- transformVertex.getDAG().topologicalDo(ctx::translate);
- }
+ @CompositeTransformTranslator(Combine.PerKey.class)
+ private static Pipeline.PipelineVisitor.CompositeBehavior
combinePerKeyTranslator(
+ final PipelineTranslationContext ctx,
+ final TransformHierarchy.Node beamNode,
+ final PTransform<?, ?> transform) {
+ // TODO #260: Beam Accumulator-based Partial Aggregation
+ return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
}
/**
- * Pushes the loop vertex to the stack before translating the inner DAG, and
pops it after the translation.
- *
* @param ctx provides translation context
- * @param transformVertex the given CompositeTransform to translate
- * @param transform transform which can be obtained from {@code
transformVertex}
+ * @param beamNode the given CompositeTransform to translate
+ * @param transform transform which can be obtained from {@code beamNode}
+ * @
*/
@CompositeTransformTranslator(LoopCompositeTransform.class)
- private static void loopTranslator(final TranslationContext ctx,
- final CompositeTransformVertex
transformVertex,
- final LoopCompositeTransform<?, ?>
transform) {
- final LoopVertex loopVertex = new
LoopVertex(transformVertex.getNode().getFullName());
- ctx.builder.addVertex(loopVertex, ctx.loopVertexStack);
- ctx.builder.removeVertex(loopVertex);
- ctx.loopVertexStack.push(loopVertex);
- topologicalTranslator(ctx, transformVertex, transform);
- ctx.loopVertexStack.pop();
+ private static Pipeline.PipelineVisitor.CompositeBehavior loopTranslator(
+ final PipelineTranslationContext ctx,
+ final TransformHierarchy.Node beamNode,
+ final LoopCompositeTransform<?, ?> transform) {
+ // Do nothing here, as the context handles the loop vertex stack.
+ // We just keep this method to signal that the loop vertex is acknowledged.
+ return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
}
- private DAG<IRVertex, IREdge> translateToIRDAG(final
CompositeTransformVertex vertex,
- final Pipeline pipeline,
- final PipelineOptions
pipelineOptions) {
- final TranslationContext ctx = new TranslationContext(vertex, pipeline,
primitiveTransformToTranslator,
- compositeTransformToTranslator,
DefaultCommunicationPatternSelector.INSTANCE, pipelineOptions);
- ctx.translate(vertex);
- return ctx.builder.build();
- }
+
////////////////////////////////////////////////////////////////////////////////////////////////////////
+ /////////////////////// HELPER METHODS
- /**
- * Annotates translator for PrimitiveTransform.
- */
- @Target(ElementType.METHOD)
- @Retention(RetentionPolicy.RUNTIME)
- private @interface PrimitiveTransformTranslator {
- Class<? extends PTransform>[] value();
- }
-
- /**
- * Annotates translator for CompositeTransform.
- */
- @Target(ElementType.METHOD)
- @Retention(RetentionPolicy.RUNTIME)
- private @interface CompositeTransformTranslator {
- Class<? extends PTransform>[] value();
- }
+ private static DoFnTransform createDoFnTransform(final
PipelineTranslationContext ctx,
+ final
TransformHierarchy.Node beamNode) {
+ try {
+ final AppliedPTransform pTransform =
beamNode.toAppliedPTransform(ctx.getPipeline());
+ final DoFn doFn = ParDoTranslation.getDoFn(pTransform);
+ final TupleTag mainOutputTag =
ParDoTranslation.getMainOutputTag(pTransform);
+ final List<PCollectionView<?>> sideInputs =
ParDoTranslation.getSideInputs(pTransform);
+ final TupleTagList additionalOutputTags =
ParDoTranslation.getAdditionalOutputTags(pTransform);
- private static Coder<?> getCoder(final PValue input, final
CompositeTransformVertex pipeline) {
- final Coder<?> coder;
- if (input instanceof PCollection) {
- coder = ((PCollection) input).getCoder();
- } else if (input instanceof PCollectionView) {
- coder = getCoderForView((PCollectionView) input, pipeline);
- } else {
- throw new RuntimeException(String.format("Coder for PValue %s cannot be
determined", input));
- }
- return coder;
- }
+ final PCollection<?> mainInput = (PCollection<?>)
+
Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
- /**
- * Get appropriate coder for {@link PCollectionView}.
- *
- * @param view {@link PCollectionView} from the corresponding {@link
View.CreatePCollectionView} transform
- * @return appropriate {@link Coder} for {@link PCollectionView}
- */
- private static Coder<?> getCoderForView(final PCollectionView view, final
CompositeTransformVertex pipeline) {
- final PrimitiveTransformVertex src = pipeline.getPrimitiveProducerOf(view);
- final Coder<?> baseCoder = src.getNode().getOutputs().values().stream()
- .filter(v -> v instanceof PCollection)
- .map(v -> (PCollection) v)
- .findFirst()
- .orElseThrow(() -> new RuntimeException(String.format("No incoming
PCollection to %s", src)))
- .getCoder();
- final KvCoder<?, ?> inputKVCoder = (KvCoder) baseCoder;
- final ViewFn viewFn = view.getViewFn();
- if (viewFn instanceof PCollectionViews.IterableViewFn) {
- return IterableCoder.of(inputKVCoder.getValueCoder());
- } else if (viewFn instanceof PCollectionViews.ListViewFn) {
- return ListCoder.of(inputKVCoder.getValueCoder());
- } else if (viewFn instanceof PCollectionViews.MapViewFn) {
- return MapCoder.of(inputKVCoder.getKeyCoder(),
inputKVCoder.getValueCoder());
- } else if (viewFn instanceof PCollectionViews.MultimapViewFn) {
- return MapCoder.of(inputKVCoder.getKeyCoder(),
IterableCoder.of(inputKVCoder.getValueCoder()));
- } else if (viewFn instanceof PCollectionViews.SingletonViewFn) {
- return baseCoder;
- } else {
- throw new UnsupportedOperationException(String.format("Unsupported
viewFn %s", viewFn.getClass()));
+ return new DoFnTransform(
+ doFn,
+ mainInput.getCoder(),
+ getOutputCoders(pTransform),
+ mainOutputTag,
+ additionalOutputTags.getAll(),
+ mainInput.getWindowingStrategy(),
+ sideInputs,
+ ctx.getPipelineOptions());
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
}
}
- /**
- * Translation context.
- */
- private static final class TranslationContext {
- private final CompositeTransformVertex root;
- private final PipelineOptions pipelineOptions;
- private final DAGBuilder<IRVertex, IREdge> builder;
- private final Map<PValue, IRVertex> pValueToProducer;
- private final Map<PValue, TupleTag<?>> pValueToTag;
- private final Stack<LoopVertex> loopVertexStack;
- private final BiFunction<IRVertex, IRVertex,
CommunicationPatternProperty.Value> communicationPatternSelector;
- private final Pipeline pipeline;
-
-
- private final Map<Class<? extends PTransform>, Method>
primitiveTransformToTranslator;
- private final Map<Class<? extends PTransform>, Method>
compositeTransformToTranslator;
-
- /**
- * @param root the root to translate
- * @param pipeline the pipeline to translate
- * @param primitiveTransformToTranslator provides translators for
PrimitiveTransform
- * @param compositeTransformToTranslator provides translators for
CompositeTransform
- * @param selector provides {@link CommunicationPatternProperty.Value} for
IR edges
- * @param pipelineOptions {@link PipelineOptions}
- */
- private TranslationContext(final CompositeTransformVertex root,
- final Pipeline pipeline,
- final Map<Class<? extends PTransform>, Method>
primitiveTransformToTranslator,
- final Map<Class<? extends PTransform>, Method>
compositeTransformToTranslator,
- final BiFunction<IRVertex, IRVertex,
CommunicationPatternProperty.Value> selector,
- final PipelineOptions pipelineOptions) {
- this.root = root;
- this.pipeline = pipeline;
- this.builder = new DAGBuilder<>();
- this.pValueToProducer = new HashMap<>();
- this.pValueToTag = new HashMap<>();
- this.loopVertexStack = new Stack<>();
- this.primitiveTransformToTranslator = primitiveTransformToTranslator;
- this.compositeTransformToTranslator = compositeTransformToTranslator;
- this.communicationPatternSelector = selector;
- this.pipelineOptions = pipelineOptions;
- }
-
- /**
- * Copy constructor, except for setting different
CommunicationPatternProperty selector.
- *
- * @param ctx the original {@link TranslationContext}
- * @param selector provides {@link CommunicationPatternProperty.Value} for
IR edges
- */
- private TranslationContext(final TranslationContext ctx,
- final BiFunction<IRVertex, IRVertex,
CommunicationPatternProperty.Value> selector) {
- this.root = ctx.root;
- this.pipeline = ctx.pipeline;
- this.pipelineOptions = ctx.pipelineOptions;
- this.builder = ctx.builder;
- this.pValueToProducer = ctx.pValueToProducer;
- this.pValueToTag = ctx.pValueToTag;
- this.loopVertexStack = ctx.loopVertexStack;
- this.primitiveTransformToTranslator = ctx.primitiveTransformToTranslator;
- this.compositeTransformToTranslator = ctx.compositeTransformToTranslator;
-
- this.communicationPatternSelector = selector;
- }
-
- /**
- * Selects appropriate translator to translate the given hierarchy.
- *
- * @param transformVertex the Beam transform hierarchy to translate
- */
- private void translate(final TransformVertex transformVertex) {
- final boolean isComposite = transformVertex instanceof
CompositeTransformVertex;
- final PTransform<?, ?> transform =
transformVertex.getNode().getTransform();
- if (transform == null) {
- // root node
- topologicalTranslator(this, (CompositeTransformVertex)
transformVertex, null);
- return;
- }
-
- Class<?> clazz = transform.getClass();
- while (true) {
- final Method translator = (isComposite ?
compositeTransformToTranslator : primitiveTransformToTranslator)
- .get(clazz);
- if (translator == null) {
- if (clazz.getSuperclass() != null) {
- clazz = clazz.getSuperclass();
- continue;
- }
- throw new UnsupportedOperationException(String.format("%s transform
%s is not supported",
- isComposite ? "Composite" : "Primitive",
transform.getClass().getCanonicalName()));
- } else {
- try {
- translator.setAccessible(true);
- translator.invoke(null, this, transformVertex, transform);
- break;
- } catch (final IllegalAccessException e) {
- throw new RuntimeException(e);
- } catch (final InvocationTargetException | RuntimeException e) {
- throw new RuntimeException(String.format(
- "Translator %s have failed to translate %s", translator,
transform), e);
- }
- }
- }
- }
-
- /**
- * Add IR vertex to the builder.
- *
- * @param vertex IR vertex to add
- */
- private void addVertex(final IRVertex vertex) {
- builder.addVertex(vertex, loopVertexStack);
- }
-
- /**
- * Add IR edge to the builder.
- *
- * @param dst the destination IR vertex.
- * @param input the {@link PValue} {@code dst} consumes
- */
- private void addEdgeTo(final IRVertex dst, final PValue input) {
- final IRVertex src = pValueToProducer.get(input);
- if (src == null) {
- try {
- throw new RuntimeException(String.format("Cannot find a vertex that
emits pValue %s, "
- + "while PTransform %s is known to produce it.", input,
root.getPrimitiveProducerOf(input)));
- } catch (final RuntimeException e) {
- throw new RuntimeException(String.format("Cannot find a vertex that
emits pValue %s, "
- + "and the corresponding PTransform was not found", input));
- }
- }
- final CommunicationPatternProperty.Value communicationPattern =
communicationPatternSelector.apply(src, dst);
- if (communicationPattern == null) {
- throw new RuntimeException(String.format("%s have failed to determine
communication pattern "
- + "for an edge from %s to %s", communicationPatternSelector, src,
dst));
- }
- final IREdge edge = new IREdge(communicationPattern, src, dst);
- final Coder coder;
- final Coder windowCoder;
- if (input instanceof PCollection) {
- coder = ((PCollection) input).getCoder();
- windowCoder = ((PCollection)
input).getWindowingStrategy().getWindowFn().windowCoder();
- } else if (input instanceof PCollectionView) {
- coder = getCoderForView((PCollectionView) input, root);
- windowCoder = ((PCollectionView) input).getPCollection()
- .getWindowingStrategy().getWindowFn().windowCoder();
- } else {
- throw new RuntimeException(String.format("While adding an edge from
%s, to %s, coder for PValue %s cannot "
- + "be determined", src, dst, input));
- }
-
- edge.setProperty(KeyExtractorProperty.of(new BeamKeyExtractor()));
-
- if (coder instanceof KvCoder) {
- Coder keyCoder = ((KvCoder) coder).getKeyCoder();
- edge.setProperty(KeyEncoderProperty.of(new
BeamEncoderFactory(keyCoder)));
- edge.setProperty(KeyDecoderProperty.of(new
BeamDecoderFactory(keyCoder)));
- }
-
- edge.setProperty(EncoderProperty.of(
- new BeamEncoderFactory<>(WindowedValue.getFullCoder(coder,
windowCoder))));
- edge.setProperty(DecoderProperty.of(
- new BeamDecoderFactory<>(WindowedValue.getFullCoder(coder,
windowCoder))));
-
- if (pValueToTag.containsKey(input)) {
-
edge.setProperty(AdditionalOutputTagProperty.of(pValueToTag.get(input).getId()));
- }
-
- if (input instanceof PCollectionView) {
- edge.setProperty(BroadcastVariableIdProperty.of((PCollectionView)
input));
- }
-
- builder.connectVertices(edge);
- }
-
- /**
- * Registers a {@link PValue} as a main output from the specified {@link
IRVertex}.
- *
- * @param irVertex the IR vertex
- * @param output the {@link PValue} {@code irVertex} emits as main output
- */
- private void registerMainOutputFrom(final IRVertex irVertex, final PValue
output) {
- pValueToProducer.put(output, irVertex);
- }
-
- /**
- * Registers a {@link PValue} as an additional output from the specified
{@link IRVertex}.
- *
- * @param irVertex the IR vertex
- * @param output the {@link PValue} {@code irVertex} emits as additional
output
- * @param tag the {@link TupleTag} associated with this additional output
- */
- private void registerAdditionalOutputFrom(final IRVertex irVertex, final
PValue output, final TupleTag<?> tag) {
- pValueToTag.put(output, tag);
- pValueToProducer.put(output, irVertex);
- }
+ private static Map<TupleTag<?>, Coder<?>> getOutputCoders(final
AppliedPTransform<?, ?, ?> ptransform) {
+ return ptransform
+ .getOutputs()
+ .entrySet()
+ .stream()
+ .filter(e -> e.getValue() instanceof PCollection)
+ .collect(Collectors.toMap(e -> e.getKey(), e -> ((PCollection)
e.getValue()).getCoder()));
}
/**
- * Default implementation for {@link CommunicationPatternProperty.Value}
selector.
+ * Create a group by key transform.
+ * It returns GroupByKeyAndWindowDoFnTransform if window function is not
default.
+ * @param ctx translation context
+ * @param beamNode transform vertex
+ * @return group by key transform
*/
- private static final class DefaultCommunicationPatternSelector
- implements BiFunction<IRVertex, IRVertex,
CommunicationPatternProperty.Value> {
-
- private static final DefaultCommunicationPatternSelector INSTANCE = new
DefaultCommunicationPatternSelector();
-
- @Override
- public CommunicationPatternProperty.Value apply(final IRVertex src, final
IRVertex dst) {
- final Class<?> constructUnionTableFn;
- try {
- constructUnionTableFn =
Class.forName("org.apache.beam.sdk.transforms.join.CoGroupByKey$ConstructUnionTableFn");
- } catch (final ClassNotFoundException e) {
- throw new RuntimeException(e);
- }
-
- final Transform srcTransform = src instanceof OperatorVertex ?
((OperatorVertex) src).getTransform() : null;
- final Transform dstTransform = dst instanceof OperatorVertex ?
((OperatorVertex) dst).getTransform() : null;
- final DoFn srcDoFn = srcTransform instanceof DoFnTransform ?
((DoFnTransform) srcTransform).getDoFn() : null;
+ private static Transform createGBKTransform(
+ final PipelineTranslationContext ctx,
+ final TransformHierarchy.Node beamNode) {
+ final AppliedPTransform pTransform =
beamNode.toAppliedPTransform(ctx.getPipeline());
+ final PCollection<?> mainInput = (PCollection<?>)
+
Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
+ final TupleTag mainOutputTag = new TupleTag<>();
- if (srcDoFn != null && srcDoFn.getClass().equals(constructUnionTableFn))
{
- return CommunicationPatternProperty.Value.Shuffle;
- }
- if (srcTransform instanceof FlattenTransform) {
- return CommunicationPatternProperty.Value.OneToOne;
- }
- if (dstTransform instanceof GroupByKeyAndWindowDoFnTransform
- || dstTransform instanceof GroupByKeyTransform) {
- return CommunicationPatternProperty.Value.Shuffle;
- }
- if (dstTransform instanceof CreateViewTransform) {
- return CommunicationPatternProperty.Value.BroadCast;
- }
- return CommunicationPatternProperty.Value.OneToOne;
+ if (isBatch(beamNode, ctx.getPipeline())) {
+ return new GroupByKeyTransform();
+ } else {
+ return new GroupByKeyAndWindowDoFnTransform(
+ getOutputCoders(pTransform),
+ mainOutputTag,
+ Collections.emptyList(), /* GBK does not have additional outputs */
+ mainInput.getWindowingStrategy(),
+ Collections.emptyList(), /* GBK does not have additional side inputs
*/
+ ctx.getPipelineOptions(),
+ SystemReduceFn.buffering(mainInput.getCoder()));
}
}
- /**
- * A {@link CommunicationPatternProperty.Value} selector which always emits
OneToOne.
- */
- private static final class OneToOneCommunicationPatternSelector
- implements BiFunction<IRVertex, IRVertex,
CommunicationPatternProperty.Value> {
- private static final OneToOneCommunicationPatternSelector INSTANCE = new
OneToOneCommunicationPatternSelector();
-
- @Override
- public CommunicationPatternProperty.Value apply(final IRVertex src, final
IRVertex dst) {
- return CommunicationPatternProperty.Value.OneToOne;
- }
+ private static boolean isBatch(final TransformHierarchy.Node beamNode, final
Pipeline pipeline) {
Review comment:
isBatch -> isGlobalWindow? because batch can also use non-global windows
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services