taegeonum commented on a change in pull request #159: [NEMO-216,251,259] Support side inputs and windowing URL: https://github.com/apache/incubator-nemo/pull/159#discussion_r234858450
########## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java ########## @@ -91,69 +92,83 @@ void addVertex(final IRVertex vertex) { builder.addVertex(vertex, loopVertexStack); } + /** + * Say the dstIRVertex consumes three views: view0, view1, and view2. + * + * We translate that as the following: + * view0 -> SideInputTransform(index=0) -> + * view1 -> SideInputTransform(index=1) -> dstIRVertex(with a map from indices to PCollectionViews) + * view2 -> SideInputTransform(index=2) -> + * + * @param dstVertex vertex. + * @param sideInputs of the vertex. + */ + void addSideInputEdges(final IRVertex dstVertex, final Map<Integer, PCollectionView<?>> sideInputs) { + for (final Map.Entry<Integer, PCollectionView<?>> entry : sideInputs.entrySet()) { + final int index = entry.getKey(); + final PCollectionView view = entry.getValue(); + + final IRVertex srcVertex = pValueToProducerVertex.get(view); + final IRVertex sideInputTransformVertex = new OperatorVertex(new SideInputTransform(index)); + addVertex(sideInputTransformVertex); + final Coder viewCoder = getCoderForView(view, this); + final Coder windowCoder = view.getPCollection().getWindowingStrategy().getWindowFn().windowCoder(); + + // First edge: view to transform + final IREdge firstEdge = + new IREdge(CommunicationPatternProperty.Value.OneToOne, srcVertex, sideInputTransformVertex); + addEdge(firstEdge, viewCoder, windowCoder); + + // Second edge: transform to the dstIRVertex + final IREdge secondEdge = + new IREdge(CommunicationPatternProperty.Value.OneToOne, sideInputTransformVertex, dstVertex); + final Coder sideInputElementCoder = SideInputCoder.of(WindowedValue.getFullCoder(viewCoder, windowCoder)); + secondEdge.setProperty(EncoderProperty.of(new BeamEncoderFactory(sideInputElementCoder))); + secondEdge.setProperty(DecoderProperty.of(new BeamDecoderFactory(sideInputElementCoder))); + builder.connectVertices(secondEdge); + } + } + /** * Add IR edge to the builder. * * @param dst the destination IR vertex. * @param input the {@link PValue} {@code dst} consumes */ void addEdgeTo(final IRVertex dst, final PValue input) { - final Coder coder; if (input instanceof PCollection) { - coder = ((PCollection) input).getCoder(); - } else if (input instanceof PCollectionView) { - coder = getCoderForView((PCollectionView) input, this); - } else { - throw new RuntimeException(String.format("While adding an edge to %s, coder for PValue %s cannot " - + "be determined", dst, input)); - } - addEdgeTo(dst, input, coder); - } + final Coder elementCoder = ((PCollection) input).getCoder(); + final Coder windowCoder = ((PCollection) input).getWindowingStrategy().getWindowFn().windowCoder(); + final IRVertex src = pValueToProducerVertex.get(input); + if (src == null) { + throw new IllegalStateException(String.format("Cannot find a vertex that emits pValue %s", input)); + } - void addEdgeTo(final IRVertex dst, final PValue input, final Coder elementCoder) { - final IRVertex src = pValueToProducerVertex.get(input); - if (src == null) { - throw new IllegalStateException(String.format("Cannot find a vertex that emits pValue %s", input)); - } + final CommunicationPatternProperty.Value communicationPattern = getCommPattern(src, dst); + final IREdge edge = new IREdge(communicationPattern, src, dst); - final Coder windowCoder; - final CommunicationPatternProperty.Value communicationPattern = getCommPattern(src, dst); - final IREdge edge = new IREdge(communicationPattern, src, dst); + if (pValueToTag.containsKey(input)) { + edge.setProperty(AdditionalOutputTagProperty.of(pValueToTag.get(input).getId())); + } - if (pValueToTag.containsKey(input)) { - edge.setProperty(AdditionalOutputTagProperty.of(pValueToTag.get(input).getId())); - } - if (input instanceof PCollectionView) { - edge.setProperty(BroadcastVariableIdProperty.of((PCollectionView) input)); - } - if (input instanceof PCollection) { - windowCoder = ((PCollection) input).getWindowingStrategy().getWindowFn().windowCoder(); - } else if (input instanceof PCollectionView) { - windowCoder = ((PCollectionView) input).getPCollection() - .getWindowingStrategy().getWindowFn().windowCoder(); + addEdge(edge, elementCoder, windowCoder); } else { - throw new RuntimeException(String.format("While adding an edge from %s, to %s, coder for PValue %s cannot " - + "be determined", src, dst, input)); + throw new IllegalStateException(input.toString()); } - - addEdgeTo(edge, elementCoder, windowCoder); } - void addEdgeTo(final IREdge edge, - final Coder elementCoder, - final Coder windowCoder) { + void addEdge(final IREdge edge, final Coder elementCoder, final Coder windowCoder) { + // TODO key extractor only when many to many Review comment: please add jira issue number ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services