wonook closed pull request #109: [NEMO-89] Clean up IRVertex#getClone() URL: https://github.com/apache/incubator-nemo/pull/109
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/common/src/main/java/edu/snu/nemo/common/Cloneable.java b/common/src/main/java/edu/snu/nemo/common/Cloneable.java new file mode 100644 index 000000000..4a75b7d96 --- /dev/null +++ b/common/src/main/java/edu/snu/nemo/common/Cloneable.java @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.common; + +/** + * This interface is implemented by objects that can be cloned. + * <p> + * This interface also overcomes the drawback of {@link java.lang.Cloneable} interface which + * doesn't have a clone method. + * Josh Bloch (a JDK author) has pointed out this in his book "Effective Java" as + * <a href="http://thefinestartist.com/effective-java/11"> Override clone judiciously </a> + * </p> + * + * @param <T> the type of objects that this class can clone + */ +public interface Cloneable<T extends Cloneable<T>> { + + /** + * Creates and returns a copy of this object. + * <p> + * The precise meaning of "copy" may depend on the class of the object. + * The general intent is that, all fields of the object are copied. + * </p> + * + * @return a clone of this object. + */ + T getClone(); +} diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java index 09ebfbff5..5810233af 100644 --- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java +++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java @@ -19,6 +19,7 @@ import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap; import edu.snu.nemo.common.dag.Vertex; import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty; +import edu.snu.nemo.common.Cloneable; import java.io.Serializable; import java.util.Optional; @@ -27,7 +28,7 @@ * The basic unit of operation in a dataflow program, as well as the most important data structure in Nemo. * An IRVertex is created and modified in the compiler, and executed in the runtime. */ -public abstract class IRVertex extends Vertex { +public abstract class IRVertex extends Vertex implements Cloneable<IRVertex> { private final ExecutionPropertyMap<VertexExecutionProperty> executionProperties; private boolean stagePartitioned; @@ -41,9 +42,16 @@ public IRVertex() { } /** - * @return a clone elemnt of the IRVertex. + * Copy Constructor for IRVertex. + * + * @param that the source object for copying */ - public abstract IRVertex getClone(); + public IRVertex(final IRVertex that) { + super(IdManager.newVertexId()); + this.executionProperties = ExecutionPropertyMap.of(this); + that.getExecutionProperties().forEachProperties(this::setProperty); + this.stagePartitioned = that.stagePartitioned; + } /** * Static function to copy executionProperties from a vertex to the other. @@ -55,6 +63,7 @@ public final void copyExecutionPropertiesTo(final IRVertex thatVertex) { /** * Set an executionProperty of the IRVertex. + * * @param executionProperty new execution property. * @return the IRVertex with the execution property set. */ @@ -65,6 +74,7 @@ public final IRVertex setProperty(final VertexExecutionProperty<?> executionProp /** * Set an executionProperty of the IRVertex, permanently. + * * @param executionProperty new execution property. * @return the IRVertex with the execution property set. */ diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/InMemorySourceVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/InMemorySourceVertex.java index c73475bdd..99a418350 100644 --- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/InMemorySourceVertex.java +++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/InMemorySourceVertex.java @@ -29,18 +29,28 @@ private Iterable<T> initializedSourceData; /** - * Constructor. + * Constructor for InMemorySourceVertex. + * * @param initializedSourceData the initial data object. */ public InMemorySourceVertex(final Iterable<T> initializedSourceData) { + super(); this.initializedSourceData = initializedSourceData; } + /** + * Copy Constructor for InMemorySourceVertex. + * + * @param that the source object for copying + */ + public InMemorySourceVertex(final InMemorySourceVertex that) { + super(that); + this.initializedSourceData = that.initializedSourceData; + } + @Override public InMemorySourceVertex<T> getClone() { - final InMemorySourceVertex<T> that = new InMemorySourceVertex<>(this.initializedSourceData); - this.copyExecutionPropertiesTo(that); - return that; + return new InMemorySourceVertex<>(this); } @Override diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/LoopVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/LoopVertex.java index 96c7fca61..7d25aeea7 100644 --- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/LoopVertex.java +++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/LoopVertex.java @@ -31,59 +31,62 @@ * IRVertex that contains a partial DAG that is iterative. */ public final class LoopVertex extends IRVertex { + private static int duplicateEdgeGroupId = 0; - private final DAGBuilder<IRVertex, IREdge> builder; // Contains DAG information + // Contains DAG information + private final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>(); private final String compositeTransformFullName; - - private final Map<IRVertex, Set<IREdge>> dagIncomingEdges; // for the initial iteration - private final Map<IRVertex, Set<IREdge>> iterativeIncomingEdges; // Edges from previous iterations connected internal. - private final Map<IRVertex, Set<IREdge>> nonIterativeIncomingEdges; // Edges from outside previous iterations. - private final Map<IRVertex, Set<IREdge>> dagOutgoingEdges; // for the final iteration - private final Map<IREdge, IREdge> edgeWithLoopToEdgeWithInternalVertex; - private final Map<IREdge, IREdge> edgeWithInternalVertexToEdgeWithLoop; - + // for the initial iteration + private final Map<IRVertex, Set<IREdge>> dagIncomingEdges = new HashMap<>(); + // Edges from previous iterations connected internal. + private final Map<IRVertex, Set<IREdge>> iterativeIncomingEdges = new HashMap<>(); + // Edges from outside previous iterations. + private final Map<IRVertex, Set<IREdge>> nonIterativeIncomingEdges = new HashMap<>(); + // for the final iteration + private final Map<IRVertex, Set<IREdge>> dagOutgoingEdges = new HashMap<>(); + private final Map<IREdge, IREdge> edgeWithLoopToEdgeWithInternalVertex = new HashMap<>(); + private final Map<IREdge, IREdge> edgeWithInternalVertexToEdgeWithLoop = new HashMap<>(); private Integer maxNumberOfIterations; private IntPredicate terminationCondition; /** * The LoopVertex constructor. + * * @param compositeTransformFullName full name of the composite transform. */ public LoopVertex(final String compositeTransformFullName) { super(); - this.builder = new DAGBuilder<>(); this.compositeTransformFullName = compositeTransformFullName; - this.dagIncomingEdges = new HashMap<>(); - this.iterativeIncomingEdges = new HashMap<>(); - this.nonIterativeIncomingEdges = new HashMap<>(); - this.dagOutgoingEdges = new HashMap<>(); - this.edgeWithLoopToEdgeWithInternalVertex = new HashMap<>(); - this.edgeWithInternalVertexToEdgeWithLoop = new HashMap<>(); this.maxNumberOfIterations = 1; // 1 is the default number of iterations. this.terminationCondition = (IntPredicate & Serializable) (integer -> false); // nothing much yet. } - @Override - public LoopVertex getClone() { - final LoopVertex newLoopVertex = new LoopVertex(compositeTransformFullName); - + /** + * Copy Constructor for LoopVertex. + * + * @param that the source object for copying + */ + public LoopVertex(final LoopVertex that) { + super(that); + this.compositeTransformFullName = new String(that.compositeTransformFullName); // Copy all elements to the clone - final DAG<IRVertex, IREdge> dagToCopy = this.getDAG(); + final DAG<IRVertex, IREdge> dagToCopy = that.getDAG(); dagToCopy.topologicalDo(v -> { - newLoopVertex.getBuilder().addVertex(v, dagToCopy); - dagToCopy.getIncomingEdgesOf(v).forEach(newLoopVertex.getBuilder()::connectVertices); + this.getBuilder().addVertex(v, dagToCopy); + dagToCopy.getIncomingEdgesOf(v).forEach(this.getBuilder()::connectVertices); }); - this.dagIncomingEdges.forEach(((v, es) -> es.forEach(newLoopVertex::addDagIncomingEdge))); - this.iterativeIncomingEdges.forEach((v, es) -> es.forEach(newLoopVertex::addIterativeIncomingEdge)); - this.nonIterativeIncomingEdges.forEach((v, es) -> es.forEach(newLoopVertex::addNonIterativeIncomingEdge)); - this.dagOutgoingEdges.forEach(((v, es) -> es.forEach(newLoopVertex::addDagOutgoingEdge))); - this.edgeWithLoopToEdgeWithInternalVertex.forEach((eLoop, eInternal) - -> newLoopVertex.mapEdgeWithLoop(eLoop, eInternal)); - newLoopVertex.setMaxNumberOfIterations(maxNumberOfIterations); - newLoopVertex.setTerminationCondition(terminationCondition); - - this.copyExecutionPropertiesTo(newLoopVertex); - return newLoopVertex; + that.dagIncomingEdges.forEach(((v, es) -> es.forEach(this::addDagIncomingEdge))); + that.iterativeIncomingEdges.forEach((v, es) -> es.forEach(this::addIterativeIncomingEdge)); + that.nonIterativeIncomingEdges.forEach((v, es) -> es.forEach(this::addNonIterativeIncomingEdge)); + that.dagOutgoingEdges.forEach(((v, es) -> es.forEach(this::addDagOutgoingEdge))); + that.edgeWithLoopToEdgeWithInternalVertex.forEach((eLoop, eInternal) -> this.mapEdgeWithLoop(eLoop, eInternal)); + this.maxNumberOfIterations = that.maxNumberOfIterations; + this.terminationCondition = that.terminationCondition; + } + + @Override + public LoopVertex getClone() { + return new LoopVertex(this); } /** diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/MetricCollectionBarrierVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/MetricCollectionBarrierVertex.java index 5a2ad41dc..a4a03bf3e 100644 --- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/MetricCollectionBarrierVertex.java +++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/MetricCollectionBarrierVertex.java @@ -41,17 +41,29 @@ * Constructor for dynamic optimization vertex. */ public MetricCollectionBarrierVertex() { + super(); this.metricData = new HashMap<>(); this.blockIds = new ArrayList<>(); this.dagSnapshot = null; } + /** + * Constructor for dynamic optimization vertex. + * + * @param that the source object for copying + */ + public MetricCollectionBarrierVertex(final MetricCollectionBarrierVertex<K, V> that) { + super(that); + this.metricData = new HashMap<>(); + that.metricData.forEach(this.metricData::put); + this.blockIds = new ArrayList<>(); + that.blockIds.forEach(this.blockIds::add); + this.dagSnapshot = that.dagSnapshot; + } + @Override public MetricCollectionBarrierVertex getClone() { - final MetricCollectionBarrierVertex that = new MetricCollectionBarrierVertex(); - that.setDAGSnapshot(dagSnapshot); - this.copyExecutionPropertiesTo(that); - return that; + return new MetricCollectionBarrierVertex(this); } /** diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/OperatorVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/OperatorVertex.java index 2d39736af..636b8e97a 100644 --- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/OperatorVertex.java +++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/OperatorVertex.java @@ -33,11 +33,18 @@ public OperatorVertex(final Transform t) { this.transform = t; } + /** + * Copy Constructor of OperatorVertex. + * @param that the source object for copying + */ + public OperatorVertex(final OperatorVertex that) { + super(); + this.transform = that.transform; + } + @Override public OperatorVertex getClone() { - final OperatorVertex that = new OperatorVertex(this.transform); - this.copyExecutionPropertiesTo(that); - return that; + return new OperatorVertex(this); } /** diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/SourceVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/SourceVertex.java index f64e17071..f1472d880 100644 --- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/SourceVertex.java +++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/SourceVertex.java @@ -26,6 +26,21 @@ */ public abstract class SourceVertex<O> extends IRVertex { + /** + * Constructor for SourceVertex. + */ + public SourceVertex() { + super(); + } + + /** + * Copy Constructor for SourceVertex. + * + * @param that the source object for copying + */ + public SourceVertex(final SourceVertex that) { + super(that); + } /** * Gets parallel readables. * diff --git a/common/src/main/java/edu/snu/nemo/common/test/EmptyComponents.java b/common/src/main/java/edu/snu/nemo/common/test/EmptyComponents.java index 88a768a02..a1daf8bec 100644 --- a/common/src/main/java/edu/snu/nemo/common/test/EmptyComponents.java +++ b/common/src/main/java/edu/snu/nemo/common/test/EmptyComponents.java @@ -113,6 +113,15 @@ public EmptySourceVertex(final String name) { this.name = name; } + /** + * Copy Constructor for EmptySourceVertex. + * + * @param that the source object for copying + */ + public EmptySourceVertex(final EmptySourceVertex that) { + this.name = new String(that.name); + } + @Override public String toString() { final StringBuilder sb = new StringBuilder(); @@ -137,7 +146,7 @@ public void clearInternalStates() { @Override public EmptySourceVertex<T> getClone() { - return new EmptySourceVertex<>(this.name); + return new EmptySourceVertex<>(this); } } diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java index 71fc5dacd..550efebcf 100644 --- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java +++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java @@ -41,18 +41,29 @@ /** * Constructor of BeamBoundedSourceVertex. + * * @param source BoundedSource to read from. */ public BeamBoundedSourceVertex(final BoundedSource<O> source) { + super(); this.source = source; this.sourceDescription = source.toString(); } + /** + * Constructor of BeamBoundedSourceVertex. + * + * @param that the source object for copying + */ + public BeamBoundedSourceVertex(final BeamBoundedSourceVertex that) { + super(that); + this.source = that.source; + this.sourceDescription = that.source.toString(); + } + @Override public BeamBoundedSourceVertex getClone() { - final BeamBoundedSourceVertex that = new BeamBoundedSourceVertex<>(this.source); - this.copyExecutionPropertiesTo(that); - return that; + return new BeamBoundedSourceVertex(this); } @Override diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java index 3b058071c..925beb9c7 100644 --- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java +++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java @@ -41,6 +41,7 @@ * @param dataset Dataset to read data from. */ public SparkDatasetBoundedSourceVertex(final SparkSession sparkSession, final Dataset<T> dataset) { + super(); this.readables = new ArrayList<>(); final RDD rdd = dataset.sparkRDD(); final Partition[] partitions = rdd.getPartitions(); @@ -54,19 +55,19 @@ public SparkDatasetBoundedSourceVertex(final SparkSession sparkSession, final Da } /** - * Constructor for cloning. + * Copy Constructor for SparkDatasetBoundedSourceVertex. * - * @param readables the list of Readables to set. + * @param that the source object for copying */ - private SparkDatasetBoundedSourceVertex(final List<Readable<T>> readables) { - this.readables = readables; + public SparkDatasetBoundedSourceVertex(final SparkDatasetBoundedSourceVertex<T> that) { + super(that); + this.readables = new ArrayList<>(); + that.readables.forEach(this.readables::add); } @Override public SparkDatasetBoundedSourceVertex getClone() { - final SparkDatasetBoundedSourceVertex<T> that = new SparkDatasetBoundedSourceVertex<>((this.readables)); - this.copyExecutionPropertiesTo(that); - return that; + return new SparkDatasetBoundedSourceVertex(this); } @Override diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java index 9b2fd3807..957602477 100644 --- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java +++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java @@ -40,6 +40,7 @@ public SparkTextFileBoundedSourceVertex(final SparkContext sparkContext, final String inputPath, final int numPartitions) { + super(); this.readables = new ArrayList<>(); final Partition[] partitions = sparkContext.textFile(inputPath, numPartitions).getPartitions(); for (int i = 0; i < partitions.length; i++) { @@ -55,17 +56,17 @@ public SparkTextFileBoundedSourceVertex(final SparkContext sparkContext, /** * Constructor for cloning. * - * @param readables the list of Readables to set. + * @param that the source object for copying */ - private SparkTextFileBoundedSourceVertex(final List<Readable<String>> readables) { - this.readables = readables; + private SparkTextFileBoundedSourceVertex(final SparkTextFileBoundedSourceVertex that) { + super(that); + this.readables = new ArrayList<>(); + that.readables.forEach(this.readables::add); } @Override public SparkTextFileBoundedSourceVertex getClone() { - final SparkTextFileBoundedSourceVertex that = new SparkTextFileBoundedSourceVertex(this.readables); - this.copyExecutionPropertiesTo(that); - return that; + return new SparkTextFileBoundedSourceVertex(this); } @Override ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services