This is an automated email from the ASF dual-hosted git repository. johnyangk pushed a commit to branch tpch-fix in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
commit 8a7ae953f3a9de59733df65e27d67a41269431c7 Merge: e419597 18b61c1 Author: John Yang <[email protected]> AuthorDate: Wed Oct 17 10:12:58 2018 +0900 merge .../java/org/apache/nemo/common/HashRange.java | 5 +- .../java/org/apache/nemo/common/StateMachine.java | 21 +- .../main/java/org/apache/nemo/common/dag/DAG.java | 59 +-- .../main/java/org/apache/nemo/common/dag/Edge.java | 6 +- .../java/org/apache/nemo/common/dag/Vertex.java | 7 +- .../org/apache/nemo/common/ir/edge/IREdge.java | 14 +- .../ir/executionproperty/ExecutionPropertyMap.java | 24 +- .../org/apache/nemo/common/ir/vertex/IRVertex.java | 15 +- .../apache/nemo/common/ir/vertex/LoopVertex.java | 43 +- .../nemo/common/ir/vertex/OperatorVertex.java | 13 +- compiler/frontend/beam/pom.xml | 5 + .../compiler/frontend/beam/BeamKeyExtractor.java | 7 +- .../compiler/frontend/beam/NemoPipelineRunner.java | 5 +- .../compiler/frontend/beam/PipelineTranslator.java | 141 +++--- .../beam/source/BeamBoundedSourceVertex.java | 46 +- .../BroadcastVariableSideInputReader.java | 60 +++ .../beam/transform/CreateViewTransform.java | 20 +- .../beam/transform/DefaultOutputManager.java | 51 +++ .../frontend/beam/transform/DoFnTransform.java | 157 +++++++ .../frontend/beam/transform/DoTransform.java | 490 --------------------- .../frontend/beam/transform/FlattenTransform.java | 8 +- .../beam/transform/GroupByKeyTransform.java | 15 +- .../frontend/beam/transform/WindowFnTransform.java | 98 +++++ .../frontend/beam/transform/WindowTransform.java | 61 --- .../nemo/compiler/optimizer/PairKeyExtractor.java} | 17 +- .../compiletime/reshaping/SkewReshapingPass.java | 3 +- .../compiler/backend/nemo/DAGConverterTest.java | 5 +- .../frontend/beam/transform/DoFnTransformTest.java | 287 ++++++++++++ .../beam/MultinomialLogisticRegression.java | 6 +- .../nemo/examples/beam/BeamSimpleSumSQLITCase.java | 68 +++ .../nemo/runtime/common/plan/RuntimeEdge.java | 13 +- .../org/apache/nemo/runtime/common/plan/Stage.java | 17 +- .../apache/nemo/runtime/common/plan/StageEdge.java | 19 +- .../nemo/runtime/common/state/BlockState.java | 4 +- .../nemo/runtime/common/state/PlanState.java | 4 +- .../nemo/runtime/common/state/StageState.java | 4 +- .../nemo/runtime/common/state/TaskState.java | 4 +- .../master/resource/ExecutorRepresenter.java | 14 +- .../runtime/master/scheduler/BatchScheduler.java | 4 +- .../runtime/master/scheduler/TaskRetryTest.java | 4 +- 40 files changed, 1033 insertions(+), 811 deletions(-) diff --cc compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java index 5d77296,2486a00..729aaad --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java @@@ -237,8 -276,7 +276,8 @@@ public final class PipelineTranslato final boolean handlesBeamRow = Stream .concat(transformVertex.getNode().getInputs().values().stream(), transformVertex.getNode().getOutputs().values().stream()) - .filter(pValue -> getCoder(pValue, ctx.pipeline) instanceof KvCoder) - .map(pValue -> (KvCoder) getCoder(pValue, ctx.pipeline)) // Input and output of combine should be KV ++ .filter(pValue -> getCoder(pValue, ctx.root) instanceof KvCoder) + .map(pValue -> (KvCoder) getCoder(pValue, ctx.root)) // Input and output of combine should be KV .map(kvCoder -> kvCoder.getValueCoder().getEncodedTypeDescriptor()) // We're interested in the 'Value' of KV .anyMatch(valueTypeDescriptor -> TypeDescriptor.of(Row.class).equals(valueTypeDescriptor)); if (handlesBeamRow) { @@@ -490,17 -529,18 +530,17 @@@ + "for an edge from %s to %s", communicationPatternSelector, src, dst)); } final IREdge edge = new IREdge(communicationPattern, src, dst); - final Coder<?> coder; + final Coder coder; + final Coder windowCoder; if (input instanceof PCollection) { coder = ((PCollection) input).getCoder(); + windowCoder = ((PCollection) input).getWindowingStrategy().getWindowFn().windowCoder(); } else if (input instanceof PCollectionView) { - coder = getCoderForView((PCollectionView) input, pipeline); + coder = getCoderForView((PCollectionView) input, root); - windowCoder = ((PCollectionView) input).getPCollection() - .getWindowingStrategy().getWindowFn().windowCoder(); ++ windowCoder = ((PCollectionView) input).getPCollection().getWindowingStrategy().getWindowFn().windowCoder(); } else { - coder = null; - } - if (coder == null) { throw new RuntimeException(String.format("While adding an edge from %s, to %s, coder for PValue %s cannot " - + "be determined", src, dst, input)); + + "be determined", src, dst, input)); } edge.setProperty(KeyExtractorProperty.of(new BeamKeyExtractor()));
