This is an automated email from the ASF dual-hosted git repository.
taegeonum pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 2b3a315 [Nemo-220] Move PIPELINE variable to TranslationContext in
PipelineTranslator (#145)
2b3a315 is described below
commit 2b3a315a48b539df03fcafa14daa7eea592c568e
Author: WooYeon Lee <[email protected]>
AuthorDate: Sat Nov 3 16:24:14 2018 +0900
[Nemo-220] Move PIPELINE variable to TranslationContext in
PipelineTranslator (#145)
JIRA: [NEMO-220: Move PIPELINE variable to TranslationContext in
PipelineTranslator](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-220)
**Major changes:**
- Move PIPELINE variable to TranslationContext in PipelineTranslator
**Minor changes to note:**
- PipelineTranslator does not implement Bifunction from now
**Tests for the changes:**
- N/A
---
.../compiler/frontend/beam/PipelineTranslator.java | 30 +++++++++++-----------
1 file changed, 15 insertions(+), 15 deletions(-)
diff --git
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index ee10b21..3fd9d2b 100644
---
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -55,7 +55,6 @@ import java.lang.annotation.*;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.*;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -66,8 +65,7 @@ import java.util.stream.Stream;
* For a {@link CompositeTransformVertex}, it defines how to setup and clear
{@link TranslationContext}
* before start translating inner Beam transform hierarchy.
*/
-public final class PipelineTranslator
- implements BiFunction<CompositeTransformVertex, PipelineOptions,
DAG<IRVertex, IREdge>> {
+public final class PipelineTranslator {
private static final Logger LOG =
LoggerFactory.getLogger(PipelineTranslator.class.getName());
@@ -76,9 +74,6 @@ public final class PipelineTranslator
private final Map<Class<? extends PTransform>, Method>
primitiveTransformToTranslator = new HashMap<>();
private final Map<Class<? extends PTransform>, Method>
compositeTransformToTranslator = new HashMap<>();
- // TODO #220: Move this variable to TranslationContext
- private static final AtomicReference<Pipeline> PIPELINE = new
AtomicReference<>();
-
/**
* Static translator method.
* @param pipeline the original root
@@ -89,8 +84,7 @@ public final class PipelineTranslator
public static DAG<IRVertex, IREdge> translate(final Pipeline pipeline,
final CompositeTransformVertex
root,
final PipelineOptions
pipelineOptions) {
- PIPELINE.set(pipeline);
- return INSTANCE.apply(root, pipelineOptions);
+ return INSTANCE.translateToIRDAG(root, pipeline, pipelineOptions);
}
/**
@@ -144,7 +138,7 @@ public final class PipelineTranslator
private static DoFnTransform createDoFnTransform(final TranslationContext
ctx,
final
PrimitiveTransformVertex transformVertex) {
try {
- final AppliedPTransform pTransform =
transformVertex.getNode().toAppliedPTransform(PIPELINE.get());
+ final AppliedPTransform pTransform =
transformVertex.getNode().toAppliedPTransform(ctx.pipeline);
final DoFn doFn = ParDoTranslation.getDoFn(pTransform);
final TupleTag mainOutputTag =
ParDoTranslation.getMainOutputTag(pTransform);
final List<PCollectionView<?>> sideInputs =
ParDoTranslation.getSideInputs(pTransform);
@@ -221,7 +215,7 @@ public final class PipelineTranslator
private static Transform createGBKTransform(
final TranslationContext ctx,
final TransformVertex transformVertex) {
- final AppliedPTransform pTransform =
transformVertex.getNode().toAppliedPTransform(PIPELINE.get());
+ final AppliedPTransform pTransform =
transformVertex.getNode().toAppliedPTransform(ctx.pipeline);
final PCollection<?> mainInput = (PCollection<?>)
Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
final TupleTag mainOutputTag = new TupleTag<>();
@@ -372,12 +366,12 @@ public final class PipelineTranslator
ctx.loopVertexStack.pop();
}
- @Override
- public DAG<IRVertex, IREdge> apply(final CompositeTransformVertex pipeline,
- final PipelineOptions pipelineOptions) {
- final TranslationContext ctx = new TranslationContext(pipeline,
primitiveTransformToTranslator,
+ private DAG<IRVertex, IREdge> translateToIRDAG(final
CompositeTransformVertex vertex,
+ final Pipeline pipeline,
+ final PipelineOptions
pipelineOptions) {
+ final TranslationContext ctx = new TranslationContext(vertex, pipeline,
primitiveTransformToTranslator,
compositeTransformToTranslator,
DefaultCommunicationPatternSelector.INSTANCE, pipelineOptions);
- ctx.translate(pipeline);
+ ctx.translate(vertex);
return ctx.builder.build();
}
@@ -453,23 +447,28 @@ public final class PipelineTranslator
private final Map<PValue, TupleTag<?>> pValueToTag;
private final Stack<LoopVertex> loopVertexStack;
private final BiFunction<IRVertex, IRVertex,
CommunicationPatternProperty.Value> communicationPatternSelector;
+ private final Pipeline pipeline;
+
private final Map<Class<? extends PTransform>, Method>
primitiveTransformToTranslator;
private final Map<Class<? extends PTransform>, Method>
compositeTransformToTranslator;
/**
* @param root the root to translate
+ * @param pipeline the pipeline to translate
* @param primitiveTransformToTranslator provides translators for
PrimitiveTransform
* @param compositeTransformToTranslator provides translators for
CompositeTransform
* @param selector provides {@link CommunicationPatternProperty.Value} for
IR edges
* @param pipelineOptions {@link PipelineOptions}
*/
private TranslationContext(final CompositeTransformVertex root,
+ final Pipeline pipeline,
final Map<Class<? extends PTransform>, Method>
primitiveTransformToTranslator,
final Map<Class<? extends PTransform>, Method>
compositeTransformToTranslator,
final BiFunction<IRVertex, IRVertex,
CommunicationPatternProperty.Value> selector,
final PipelineOptions pipelineOptions) {
this.root = root;
+ this.pipeline = pipeline;
this.builder = new DAGBuilder<>();
this.pValueToProducer = new HashMap<>();
this.pValueToTag = new HashMap<>();
@@ -489,6 +488,7 @@ public final class PipelineTranslator
private TranslationContext(final TranslationContext ctx,
final BiFunction<IRVertex, IRVertex,
CommunicationPatternProperty.Value> selector) {
this.root = ctx.root;
+ this.pipeline = ctx.pipeline;
this.pipelineOptions = ctx.pipelineOptions;
this.builder = ctx.builder;
this.pValueToProducer = ctx.pValueToProducer;