johnyangk closed pull request #62: [NEMO-71] Add NodeNamesAssignmentPass and
Example Application
URL: https://github.com/apache/incubator-nemo/pull/62
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
index b7cc03e7e..6826174c2 100644
--- a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
+++ b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
@@ -33,6 +33,7 @@
import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
import org.apache.reef.tang.*;
+import org.apache.reef.tang.annotations.Name;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.tang.formats.CommandLine;
import org.apache.reef.util.EnvironmentUtils;
@@ -95,12 +96,15 @@ public static void main(final String[] args) throws
Exception {
final Configuration driverConf = getDriverConf(builtJobConf);
final Configuration driverNcsConf = getDriverNcsConf();
final Configuration driverMessageConfg = getDriverMessageConf();
- final Configuration executorResourceConfig =
getExecutorResourceConf(builtJobConf);
+ final Configuration executorResourceConfig = getJSONConf(builtJobConf,
JobConf.ExecutorJSONPath.class,
+ JobConf.ExecutorJSONContents.class);
+ final Configuration bandwidthConfig = getJSONConf(builtJobConf,
JobConf.BandwidthJSONPath.class,
+ JobConf.BandwidthJSONContents.class);
final Configuration clientConf = getClientConf();
// Merge Job and Driver Confs
jobAndDriverConf = Configurations.merge(builtJobConf, driverConf,
driverNcsConf, driverMessageConfg,
- executorResourceConfig, driverRPCServer.getListeningConfiguration());
+ executorResourceConfig, bandwidthConfig,
driverRPCServer.getListeningConfiguration());
// Get DeployMode Conf
deployModeConf = Configurations.merge(getDeployModeConf(builtJobConf),
clientConf);
@@ -237,7 +241,8 @@ public static Configuration getJobConf(final String[] args)
throws IOException,
cl.registerShortNameOfClass(JobConf.OptimizationPolicy.class);
cl.registerShortNameOfClass(JobConf.DeployMode.class);
cl.registerShortNameOfClass(JobConf.DriverMemMb.class);
- cl.registerShortNameOfClass(JobConf.ExecutorJsonPath.class);
+ cl.registerShortNameOfClass(JobConf.ExecutorJSONPath.class);
+ cl.registerShortNameOfClass(JobConf.BandwidthJSONPath.class);
cl.registerShortNameOfClass(JobConf.JVMHeapSlack.class);
cl.registerShortNameOfClass(JobConf.IORequestHandleThreadsTotal.class);
cl.registerShortNameOfClass(JobConf.MaxScheduleAttempt.class);
@@ -277,19 +282,25 @@ private static Configuration getDeployModeConf(final
Configuration jobConf) thro
}
/**
- * Get executor resource configuration.
+ * Read json file and return its contents as configuration parameter.
*
- * @param jobConf job configuration to get executor json path.
- * @return executor resource configuration.
+ * @param jobConf job configuration to get json path.
+ * @param pathParameter named parameter represents path to the json file, or
an empty string
+ * @param contentsParameter named parameter represents contents of the file
+ * @return configuration with contents of the file, or an empty string as
value for {@code contentsParameter}
* @throws InjectionException exception while injection.
*/
- private static Configuration getExecutorResourceConf(final Configuration
jobConf) throws InjectionException {
+ private static Configuration getJSONConf(final Configuration jobConf,
+ final Class<? extends Name<String>>
pathParameter,
+ final Class<? extends Name<String>>
contentsParameter)
+ throws InjectionException {
final Injector injector = TANG.newInjector(jobConf);
try {
- final String path =
injector.getNamedInstance(JobConf.ExecutorJsonPath.class);
- final String contents = new String(Files.readAllBytes(Paths.get(path)),
StandardCharsets.UTF_8);
+ final String path = injector.getNamedInstance(pathParameter);
+ final String contents = path.isEmpty() ? ""
+ : new String(Files.readAllBytes(Paths.get(path)),
StandardCharsets.UTF_8);
return TANG.newConfigurationBuilder()
- .bindNamedParameter(JobConf.ExecutorJsonContents.class, contents)
+ .bindNamedParameter(contentsParameter, contents)
.build();
} catch (final IOException e) {
throw new RuntimeException(e);
diff --git
a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/NodeNamesProperty.java
b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/NodeNamesProperty.java
new file mode 100644
index 000000000..82cddd911
--- /dev/null
+++
b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/NodeNamesProperty.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.vertex.executionproperty;
+
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
+
+import java.util.HashMap;
+
+/**
+ * Map between node name and the number of parallelism which will run on the
node.
+ */
+public final class NodeNamesProperty extends
VertexExecutionProperty<HashMap<String, Integer>> {
+ /**
+ * Default constructor.
+ * @param value the map from location to the number of Task that must be
executed on the node
+ */
+ public NodeNamesProperty(final HashMap<String, Integer> value) {
+ super(value);
+ }
+
+ /**
+ * Static method for constructing {@link NodeNamesProperty}.
+ * @param value the map from location to the number of Task that must be
executed on the node
+ * @return the execution property
+ */
+ public static NodeNamesProperty of(final HashMap<String, Integer> value) {
+ return new NodeNamesProperty(value);
+ }
+}
diff --git a/compiler/optimizer/pom.xml b/compiler/optimizer/pom.xml
index 1c6525417..8195526ff 100644
--- a/compiler/optimizer/pom.xml
+++ b/compiler/optimizer/pom.xml
@@ -36,6 +36,11 @@ limitations under the License.
</repositories>
<dependencies>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-math3</artifactId>
+ <version>${commons-math.version}</version>
+ </dependency>
<dependency>
<groupId>edu.snu.nemo</groupId>
<artifactId>nemo-common</artifactId>
@@ -46,5 +51,15 @@ limitations under the License.
<artifactId>nemo-runtime-common</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/NodeNamesAssignmentPass.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/NodeNamesAssignmentPass.java
new file mode 100644
index 000000000..8c6d8a15a
--- /dev/null
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/NodeNamesAssignmentPass.java
@@ -0,0 +1,255 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import com.fasterxml.jackson.core.TreeNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import
edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.NodeNamesProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.apache.commons.math3.optim.BaseOptimizer;
+import org.apache.commons.math3.optim.PointValuePair;
+import org.apache.commons.math3.optim.linear.*;
+import org.apache.commons.math3.optim.nonlinear.scalar.GoalType;
+import org.apache.commons.math3.util.Incrementor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.*;
+
+/**
+ * Computes and assigns appropriate share of nodes to each irVertex to
minimize shuffle time,
+ * with respect to bandwidth restrictions of nodes. If bandwidth information
is not given, this pass does nothing.
+ * This pass follows task assignment of Iridium-style optimization.
+ * http://pages.cs.wisc.edu/~akella/papers/gda-sigcomm15.pdf
+ *
+ * <h3>Assumptions</h3>
+ * This pass assumes no skew in input or intermediate data, so that the number
of Task assigned to a node
+ * is proportional to the data size handled by the node.
+ * Also, this pass assumes stages with empty map as {@link NodeNamesProperty}
are assigned to nodes evenly.
+ * For example, if source splits are not distributed evenly, any source
location-aware scheduling policy will
+ * assign TaskGroups unevenly.
+ * Also, this pass assumes network bandwidth to be the bottleneck. Each node
should have enough capacity to run
+ * TaskGroups immediately as scheduler attempts to schedule a TaskGroup.
+ */
+public final class NodeNamesAssignmentPass extends AnnotatingPass {
+
+ // Index of the objective parameter, in the coefficient vector
+ private static final int OBJECTIVE_COEFFICIENT_INDEX = 0;
+ private static final Logger LOG =
LoggerFactory.getLogger(NodeNamesAssignmentPass.class);
+ private static final HashMap<String, Integer> EMPTY_MAP = new HashMap<>();
+
+ private static String bandwidthSpecificationString = "";
+
+
+ /**
+ * Default constructor.
+ */
+ public NodeNamesAssignmentPass() {
+ super(NodeNamesProperty.class,
Collections.singleton(ParallelismProperty.class));
+ }
+
+ @Override
+ public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+ if (bandwidthSpecificationString.isEmpty()) {
+ dag.topologicalDo(irVertex ->
irVertex.setProperty(NodeNamesProperty.of(EMPTY_MAP)));
+ } else {
+ assignNodeShares(dag,
BandwidthSpecification.fromJsonString(bandwidthSpecificationString));
+ }
+ return dag;
+ }
+
+ public static void setBandwidthSpecificationString(final String value) {
+ bandwidthSpecificationString = value;
+ }
+
+ private static void assignNodeShares(
+ final DAG<IRVertex, IREdge> dag,
+ final BandwidthSpecification bandwidthSpecification) {
+ dag.topologicalDo(irVertex -> {
+ final Collection<IREdge> inEdges = dag.getIncomingEdgesOf(irVertex);
+ final int parallelism =
irVertex.getPropertyValue(ParallelismProperty.class)
+ .orElseThrow(() -> new RuntimeException("Parallelism property
required"));
+ if (inEdges.size() == 0) {
+ // The stage is root stage.
+ // Fall back to setting even distribution
+ final HashMap<String, Integer> shares = new HashMap<>();
+ final List<String> nodes = bandwidthSpecification.getNodes();
+ final int defaultShare = parallelism / nodes.size();
+ final int remainder = parallelism % nodes.size();
+ for (int i = 0; i < nodes.size(); i++) {
+ shares.put(nodes.get(i), defaultShare + (i < remainder ? 1 : 0));
+ }
+ irVertex.getExecutionProperties().put(NodeNamesProperty.of(shares));
+ } else if (isOneToOneEdge(inEdges)) {
+ final Optional<NodeNamesProperty> property =
dag.getIncomingEdgesOf(irVertex).iterator().next()
+ .getExecutionProperties().get(NodeNamesProperty.class);
+ irVertex.getExecutionProperties().put(property.get());
+ } else {
+ // This IRVertex has shuffle inEdge(s), or has multiple inEdges.
+ final Map<String, Integer> parentLocationShares = new HashMap<>();
+ for (final IREdge edgeToIRVertex : dag.getIncomingEdgesOf(irVertex)) {
+ final IRVertex parentVertex = edgeToIRVertex.getSrc();
+ final Map<String, Integer> shares =
parentVertex.getPropertyValue(NodeNamesProperty.class).get();
+ for (final Map.Entry<String, Integer> element : shares.entrySet()) {
+ parentLocationShares.putIfAbsent(element.getKey(), 0);
+ parentLocationShares.put(element.getKey(),
+ element.getValue() +
parentLocationShares.get(element.getKey()));
+ }
+ }
+ final double[] ratios = optimize(bandwidthSpecification,
parentLocationShares);
+ final HashMap<String, Integer> shares = new HashMap<>();
+ for (int i = 0; i < bandwidthSpecification.getNodes().size(); i++) {
+ shares.put(bandwidthSpecification.getNodes().get(i), (int)
(ratios[i] * parallelism));
+ }
+ int remainder = parallelism - shares.values().stream().mapToInt(i ->
i).sum();
+ for (final String nodeName : shares.keySet()) {
+ if (remainder == 0) {
+ break;
+ }
+ shares.put(nodeName, shares.get(nodeName) + 1);
+ remainder--;
+ }
+ irVertex.getExecutionProperties().put(NodeNamesProperty.of(shares));
+ }
+ });
+ }
+
+ /**
+ * @param inEdges list of inEdges to the specific irVertex
+ * @return true if and only if the irVertex has one OneToOne edge
+ */
+ private static boolean isOneToOneEdge(final Collection<IREdge> inEdges) {
+ return inEdges.size() == 1 && inEdges.iterator().next()
+ .getPropertyValue(DataCommunicationPatternProperty.class).get()
+ .equals(DataCommunicationPatternProperty.Value.OneToOne);
+ }
+
+ /**
+ * Computes share of parallelism that each node is responsible for.
+ * @param bandwidthSpecification provides bandwidth information between nodes
+ * @param parentNodeShares shares of parallelism for the parent vertex
+ * @return array of fractions of parallelism that each node is responsible
for
+ */
+ private static double[] optimize(final BandwidthSpecification
bandwidthSpecification,
+ final Map<String, Integer>
parentNodeShares) {
+ final int parentParallelism =
parentNodeShares.values().stream().mapToInt(i -> i).sum();
+ final List<String> nodeNames = bandwidthSpecification.getNodes();
+ final List<LinearConstraint> constraints = new ArrayList<>();
+ final int coefficientVectorSize = nodeNames.size() + 1;
+
+ for (int i = 0; i < nodeNames.size(); i++) {
+ final String nodeName = nodeNames.get(i);
+ final int nodeCoefficientIndex = i + 1;
+ final int parentParallelismOnThisLocation =
parentNodeShares.get(nodeName);
+
+ // Upload bandwidth
+ final double[] uploadCoefficientVector = new
double[coefficientVectorSize];
+ uploadCoefficientVector[OBJECTIVE_COEFFICIENT_INDEX] =
bandwidthSpecification.up(nodeName);
+ uploadCoefficientVector[nodeCoefficientIndex] =
parentParallelismOnThisLocation;
+ constraints.add(new LinearConstraint(uploadCoefficientVector,
Relationship.GEQ,
+ parentParallelismOnThisLocation));
+
+ // Download bandwidth
+ final double[] downloadCoefficientVector = new
double[coefficientVectorSize];
+ downloadCoefficientVector[OBJECTIVE_COEFFICIENT_INDEX] =
bandwidthSpecification.down(nodeName);
+ downloadCoefficientVector[nodeCoefficientIndex] =
parentParallelismOnThisLocation - parentParallelism;
+ constraints.add(new LinearConstraint(downloadCoefficientVector,
Relationship.GEQ, 0));
+
+ // The coefficient is non-negative
+ final double[] nonNegativeCoefficientVector = new
double[coefficientVectorSize];
+ nonNegativeCoefficientVector[nodeCoefficientIndex] = 1;
+ constraints.add(new LinearConstraint(nonNegativeCoefficientVector,
Relationship.GEQ, 0));
+ }
+
+ // The sum of all coefficient is 1
+ final double[] sumCoefficientVector = new double[coefficientVectorSize];
+ for (int i = 0; i < nodeNames.size(); i++) {
+ sumCoefficientVector[OBJECTIVE_COEFFICIENT_INDEX + 1 + i] = 1;
+ }
+ constraints.add(new LinearConstraint(sumCoefficientVector,
Relationship.EQ, 1));
+
+ // Objective
+ final double[] objectiveCoefficientVector = new
double[coefficientVectorSize];
+ objectiveCoefficientVector[OBJECTIVE_COEFFICIENT_INDEX] = 1;
+ final LinearObjectiveFunction objectiveFunction = new
LinearObjectiveFunction(objectiveCoefficientVector, 0);
+
+ // Solve
+ try {
+ final SimplexSolver solver = new SimplexSolver();
+ final Field iterations =
BaseOptimizer.class.getDeclaredField("iterations");
+ iterations.setAccessible(true);
+ final Incrementor incrementor = (Incrementor) iterations.get(solver);
+ incrementor.setMaximalCount(2147483647);
+ LOG.info(String.format("Max iterations: %d", solver.getMaxIterations()));
+ final PointValuePair solved = solver.optimize(
+ new LinearConstraintSet(constraints), objectiveFunction,
GoalType.MINIMIZE);
+
+ return Arrays.copyOfRange(solved.getPoint(), OBJECTIVE_COEFFICIENT_INDEX
+ 1, coefficientVectorSize);
+ } catch (final NoSuchFieldException | IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Bandwidth specification.
+ */
+ private static final class BandwidthSpecification {
+ private final List<String> nodeNames = new ArrayList<>();
+ private final Map<String, Integer> uplinkBandwidth = new HashMap<>();
+ private final Map<String, Integer> downlinkBandwidth = new HashMap<>();
+
+ private BandwidthSpecification() {
+ }
+
+ static BandwidthSpecification fromJsonString(final String jsonString) {
+ final BandwidthSpecification specification = new
BandwidthSpecification();
+ try {
+ final ObjectMapper objectMapper = new ObjectMapper();
+ final TreeNode jsonRootNode = objectMapper.readTree(jsonString);
+ for (int i = 0; i < jsonRootNode.size(); i++) {
+ final TreeNode locationNode = jsonRootNode.get(i);
+ final String name =
locationNode.get("name").traverse().nextTextValue();
+ final int up = locationNode.get("up").traverse().getIntValue();
+ final int down = locationNode.get("down").traverse().getIntValue();
+ specification.nodeNames.add(name);
+ specification.uplinkBandwidth.put(name, up);
+ specification.downlinkBandwidth.put(name, down);
+ }
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ return specification;
+ }
+
+ int up(final String nodeName) {
+ return uplinkBandwidth.get(nodeName);
+ }
+
+ int down(final String nodeName) {
+ return downlinkBandwidth.get(nodeName);
+ }
+
+ List<String> getNodes() {
+ return nodeNames;
+ }
+ }
+}
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
index a30abec15..dc8abfcb9 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
@@ -39,6 +39,7 @@ public PrimitiveCompositePass() {
new CompressionPass(),
new DecompressionPass(),
new SourceLocationAwareSchedulingPass(),
+ new NodeNamesAssignmentPass(),
new ExecutorSlotCompliancePass()
));
}
diff --git a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
index 1e4ef4dec..bed8099d9 100644
--- a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
+++ b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
@@ -115,12 +115,27 @@
public final class DriverMemMb implements Name<Integer> {
}
+ /**
+ * Path to the JSON file that specifies bandwidth between locations.
+ */
+ @NamedParameter(doc = "Path to the JSON file that specifies bandwidth
between locations",
+ short_name = "bandwidth_json", default_value = "")
+ public final class BandwidthJSONPath implements Name<String> {
+ }
+
/**
* Path to the JSON file that specifies resource layout.
*/
@NamedParameter(doc = "Path to the JSON file that specifies resources for
executors", short_name = "executor_json",
default_value = "examples/resources/sample_executor_resources.json")
- public final class ExecutorJsonPath implements Name<String> {
+ public final class ExecutorJSONPath implements Name<String> {
+ }
+
+ /**
+ * Contents of the JSON file that specifies bandwidth between locations.
+ */
+ @NamedParameter(doc = "Contents of JSON file that specifies bandwidth
between locations")
+ public final class BandwidthJSONContents implements Name<String> {
}
/**
@@ -135,7 +150,7 @@
* Contents of the JSON file that specifies resource layout.
*/
@NamedParameter(doc = "Contents of JSON file that specifies resources for
executors")
- public final class ExecutorJsonContents implements Name<String> {
+ public final class ExecutorJSONContents implements Name<String> {
}
/**
diff --git
a/examples/beam/src/main/java/edu/snu/nemo/examples/beam/NetworkTraceAnalysis.java
b/examples/beam/src/main/java/edu/snu/nemo/examples/beam/NetworkTraceAnalysis.java
new file mode 100644
index 000000000..1f2f9900a
--- /dev/null
+++
b/examples/beam/src/main/java/edu/snu/nemo/examples/beam/NetworkTraceAnalysis.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.examples.beam;
+
+import edu.snu.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import edu.snu.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.transforms.join.CoGroupByKey;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * An app that analyzes data flow from network trace.
+ * Each line in the output file represents a host, containing the standard
deviation of the lengths of packets
+ * that flows into the host (reads input0 file), and the standard deviation of
the lengths of packets
+ * that flows out from the host (reads input1 file).
+ */
+public final class NetworkTraceAnalysis {
+ /**
+ * Private constructor.
+ */
+ private NetworkTraceAnalysis() {
+ }
+
+ /**
+ * Main function for the Beam program.
+ * @param args arguments.
+ */
+ public static void main(final String[] args) {
+ final String input0FilePath = args[0];
+ final String input1FilePath = args[1];
+ final String outputFilePath = args[2];
+ final PipelineOptions options =
PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
+ options.setRunner(NemoPipelineRunner.class);
+ options.setJobName("NetworkTraceAnalysis");
+
+ // Given "4 0.0 192.168.3.1 -> 192.168.0.2 Len=29", this finds
"192.168.3.1", "192.168.0.2" and "29"
+ final Pattern pattern = Pattern.compile(" *\\d+ +[0-9.]+ +([0-9.]+) ->
([0-9.]+) +.*Len=(\\d+)");
+
+ final SimpleFunction<String, Boolean> filter = new SimpleFunction<String,
Boolean>() {
+ @Override
+ public Boolean apply(final String line) {
+ return pattern.matcher(line).find();
+ }
+ };
+ final SimpleFunction<KV<String, Iterable<KV<String, Long>>>, KV<String,
Long>> mapToStdev
+ = new SimpleFunction<KV<String, Iterable<KV<String, Long>>>,
KV<String, Long>>() {
+ @Override
+ public KV<String, Long> apply(final KV<String, Iterable<KV<String,
Long>>> kv) {
+ return KV.of(kv.getKey(), stdev(kv.getValue()));
+ }
+ };
+
+ final Pipeline p = Pipeline.create(options);
+ final PCollection<KV<String, Long>> in0 = GenericSourceSink.read(p,
input0FilePath)
+ .apply(Filter.by(filter))
+ .apply(MapElements.via(new SimpleFunction<String, KV<String,
KV<String, Long>>>() {
+ @Override
+ public KV<String, KV<String, Long>> apply(final String line) {
+ final Matcher matcher = pattern.matcher(line);
+ matcher.find();
+ return KV.of(matcher.group(2), KV.of(matcher.group(1),
Long.valueOf(matcher.group(3))));
+ }
+ }))
+ .apply(GroupByKey.create())
+ .apply(MapElements.via(mapToStdev));
+ final PCollection<KV<String, Long>> in1 = GenericSourceSink.read(p,
input1FilePath)
+ .apply(Filter.by(filter))
+ .apply(MapElements.via(new SimpleFunction<String, KV<String,
KV<String, Long>>>() {
+ @Override
+ public KV<String, KV<String, Long>> apply(final String line) {
+ final Matcher matcher = pattern.matcher(line);
+ matcher.find();
+ return KV.of(matcher.group(1), KV.of(matcher.group(2),
Long.valueOf(matcher.group(3))));
+ }
+ }))
+ .apply(GroupByKey.create())
+ .apply(MapElements.via(mapToStdev));
+ final TupleTag<Long> tag0 = new TupleTag<>();
+ final TupleTag<Long> tag1 = new TupleTag<>();
+ final PCollection<KV<String, CoGbkResult>> joined =
+ KeyedPCollectionTuple.of(tag0, in0).and(tag1,
in1).apply(CoGroupByKey.create());
+ final PCollection<String> result = joined
+ .apply(MapElements.via(new SimpleFunction<KV<String, CoGbkResult>,
String>() {
+ @Override
+ public String apply(final KV<String, CoGbkResult> kv) {
+ final long source = getLong(kv.getValue().getAll(tag0));
+ final long destination = getLong(kv.getValue().getAll(tag1));
+ final String intermediate = kv.getKey();
+ return new
StringBuilder(intermediate).append(",").append(source).append(",")
+ .append(destination).toString();
+ }
+ }));
+ GenericSourceSink.write(result, outputFilePath);
+ p.run();
+ }
+
+ private static long getLong(final Iterable<Long> data) {
+ for (final long datum : data) {
+ return datum;
+ }
+ return 0;
+ }
+
+ private static long stdev(final Iterable<KV<String, Long>> data) {
+ final StandardDeviation stdev = new StandardDeviation();
+ final List<Long> elements = new ArrayList<>();
+ for (final KV<String, Long> e : data) {
+ elements.add(e.getValue());
+ }
+ return Math.round(stdev.evaluate(elements.stream().mapToDouble(e ->
e).toArray()));
+ }
+}
diff --git
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/NetworkTraceAnalysisITCase.java
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/NetworkTraceAnalysisITCase.java
new file mode 100644
index 000000000..f5e344c3a
--- /dev/null
+++
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/NetworkTraceAnalysisITCase.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.examples.beam;
+
+import edu.snu.nemo.client.JobLauncher;
+import edu.snu.nemo.common.test.ArgBuilder;
+import edu.snu.nemo.common.test.ExampleTestUtil;
+import edu.snu.nemo.examples.beam.policy.DefaultPolicyParallelismFive;
+import edu.snu.nemo.examples.beam.policy.PadoPolicyParallelismFive;
+import edu.snu.nemo.examples.beam.policy.SailfishPolicyParallelismFive;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(JobLauncher.class)
+public final class NetworkTraceAnalysisITCase {
+ private static final int TIMEOUT = 120000;
+ private static ArgBuilder builder;
+ private static final String fileBasePath = System.getProperty("user.dir") +
"/../resources/";
+
+ private static final String inputFileName0 = "sample_input_network0";
+ private static final String inputFileName1 = "sample_input_network1";
+ private static final String outputFileName = "sample_output_network";
+ private static final String testResourceFileName = "test_output_network";
+ private static final String executorResourceFileName = fileBasePath +
"beam_sample_executor_resources.json";
+ private static final String inputFilePath0 = fileBasePath + inputFileName0;
+ private static final String inputFilePath1 = fileBasePath + inputFileName1;
+ private static final String outputFilePath = fileBasePath + outputFileName;
+
+ @Before
+ public void setUp() throws Exception {
+ builder = new ArgBuilder()
+ .addResourceJson(executorResourceFileName)
+ .addUserMain(NetworkTraceAnalysis.class.getCanonicalName())
+ .addUserArgs(inputFilePath0, inputFilePath1, outputFilePath);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ try {
+ ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName,
testResourceFileName);
+ } finally {
+ ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+ }
+ }
+
+ @Test(timeout = TIMEOUT)
+ public void test() throws Exception {
+ JobLauncher.main(builder
+ .addJobId(NetworkTraceAnalysisITCase.class.getSimpleName())
+
.addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
+ .build());
+ }
+
+ @Test (timeout = TIMEOUT)
+ public void testSailfish() throws Exception {
+ JobLauncher.main(builder
+ .addJobId(NetworkTraceAnalysisITCase.class.getSimpleName() +
"_sailfish")
+
.addOptimizationPolicy(SailfishPolicyParallelismFive.class.getCanonicalName())
+ .build());
+ }
+
+ @Test (timeout = TIMEOUT)
+ public void testPado() throws Exception {
+ JobLauncher.main(builder
+ .addJobId(NetworkTraceAnalysisITCase.class.getSimpleName() + "_pado")
+
.addOptimizationPolicy(PadoPolicyParallelismFive.class.getCanonicalName())
+ .build());
+ }
+}
diff --git a/examples/resources/sample_input_network0
b/examples/resources/sample_input_network0
new file mode 100644
index 000000000..55176d77b
--- /dev/null
+++ b/examples/resources/sample_input_network0
@@ -0,0 +1,8 @@
+1 0.0 192.168.0.1 -> 192.168.0.2 Len=32
+2 0.0 192.168.1.1 -> 192.168.0.2 Len=31
+3 0.0 192.168.2.1 -> 192.168.0.2 Len=30
+4 0.0 192.168.3.1 -> 192.168.0.2 Len=29
+5 0.0 192.168.4.1 -> 192.168.0.2 Len=28
+6 0.0 192.168.5.1 -> 192.168.0.3 Len=27
+7 0.0 192.168.5.1 -> 192.168.0.3 Len=26
+8 0.0 192.168.1.1 -> 192.168.0.3 Len=25
diff --git a/examples/resources/sample_input_network1
b/examples/resources/sample_input_network1
new file mode 100644
index 000000000..f126482a0
--- /dev/null
+++ b/examples/resources/sample_input_network1
@@ -0,0 +1,8 @@
+1 0.0 192.168.0.2 -> 192.168.1.10 Len=32
+2 0.0 192.168.0.2 -> 192.168.1.10 Len=31
+3 0.0 192.168.0.2 -> 192.168.2.10 Len=30
+4 0.0 192.168.0.2 -> 192.168.2.10 Len=29
+5 0.0 192.168.0.2 -> 192.168.3.10 Len=16
+6 0.0 192.168.0.3 -> 192.168.3.10 Len=15
+7 0.0 192.168.0.3 -> 192.168.4.10 Len=14
+8 0.0 192.168.0.3 -> 192.168.4.10 Len=13
diff --git a/examples/resources/test_output_network
b/examples/resources/test_output_network
new file mode 100644
index 000000000..a6f411528
--- /dev/null
+++ b/examples/resources/test_output_network
@@ -0,0 +1,2 @@
+192.168.0.2,2,7
+192.168.0.3,1,1
diff --git a/pom.xml b/pom.xml
index 42ad7abe7..aa8837deb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -44,6 +44,7 @@ limitations under the License.
<netty.version>4.1.16.Final</netty.version>
<jetty-server.version>9.4.10.v20180503</jetty-server.version>
<jetty-servlet.version>9.4.10.v20180503</jetty-servlet.version>
+ <commons-math.version>3.6.1</commons-math.version>
<slf4j.version>1.7.20</slf4j.version>
<!-- Tests -->
<mockito.version>2.13.0</mockito.version>
diff --git a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
index d6c6ccc89..f4493e8f7 100644
--- a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
+++ b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
@@ -16,6 +16,7 @@
package edu.snu.nemo.driver;
import edu.snu.nemo.common.ir.IdManager;
+import
edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.NodeNamesAssignmentPass;
import edu.snu.nemo.conf.JobConf;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
import edu.snu.nemo.runtime.common.comm.ControlMessage;
@@ -81,7 +82,8 @@ private NemoDriver(final UserApplicationRunner
userApplicationRunner,
final LocalAddressProvider localAddressProvider,
final JobMessageObserver client,
final ClientRPC clientRPC,
- @Parameter(JobConf.ExecutorJsonContents.class) final
String resourceSpecificationString,
+ @Parameter(JobConf.ExecutorJSONContents.class) final
String resourceSpecificationString,
+ @Parameter(JobConf.BandwidthJSONContents.class) final
String bandwidthString,
@Parameter(JobConf.JobId.class) final String jobId,
@Parameter(JobConf.FileDirectory.class) final String
localDirectory,
@Parameter(JobConf.GlusterVolumeDirectory.class) final
String glusterDirectory) {
@@ -96,6 +98,8 @@ private NemoDriver(final UserApplicationRunner
userApplicationRunner,
this.glusterDirectory = glusterDirectory;
this.handler = new RemoteClientMessageLoggingHandler(client);
this.clientRPC = clientRPC;
+ // TODO #69: Support job-wide execution property
+ NodeNamesAssignmentPass.setBandwidthSpecificationString(bandwidthString);
clientRPC.registerHandler(ControlMessage.ClientToDriverMessageType.LaunchDAG,
message ->
startSchedulingUserApplication(message.getLaunchDAG().getDag()));
// Send DriverStarted message to the client
diff --git
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
new file mode 100644
index 000000000..584cd67d8
--- /dev/null
+++
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.NodeNamesProperty;
+import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.plan.Task;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+
+import javax.inject.Inject;
+import java.util.*;
+
+/**
+ * This constraint is to follow {@link NodeNamesProperty}.
+ */
+@AssociatedProperty(NodeNamesProperty.class)
+public final class NodeShareSchedulingConstraint implements
SchedulingConstraint {
+
+ @Inject
+ private NodeShareSchedulingConstraint() {
+ }
+
+ private String getNodeName(final Map<String, Integer> propertyValue, final
int taskIndex) {
+ final List<String> nodeNames = new ArrayList<>(propertyValue.keySet());
+ Collections.sort(nodeNames, Comparator.naturalOrder());
+ int index = taskIndex;
+ for (final String nodeName : nodeNames) {
+ if (index < propertyValue.get(nodeName)) {
+ index -= propertyValue.get(nodeName);
+ } else {
+ return nodeName;
+ }
+ }
+ throw new IllegalStateException("Detected excessive parallelism which
NodeNamesProperty does not cover");
+ }
+
+ @Override
+ public boolean testSchedulability(final ExecutorRepresenter executor, final
Task task) {
+ final Map<String, Integer> propertyValue =
task.getPropertyValue(NodeNamesProperty.class)
+ .orElseThrow(() -> new RuntimeException("NodeNamesProperty
expected"));
+ if (propertyValue.isEmpty()) {
+ return true;
+ }
+ return executor.getNodeName().equals(
+ getNodeName(propertyValue,
RuntimeIdGenerator.getIndexFromTaskId(task.getTaskId())));
+ }
+}
diff --git
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
index 97ab5544c..639e77579 100644
---
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
+++
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
@@ -39,10 +39,12 @@
private SchedulingConstraintRegistry(
final ContainerTypeAwareSchedulingConstraint
containerTypeAwareSchedulingConstraint,
final FreeSlotSchedulingConstraint freeSlotSchedulingConstraint,
- final SourceLocationAwareSchedulingConstraint
sourceLocationAwareSchedulingConstraint) {
+ final SourceLocationAwareSchedulingConstraint
sourceLocationAwareSchedulingConstraint,
+ final NodeShareSchedulingConstraint nodeShareSchedulingConstraint) {
registerSchedulingConstraint(containerTypeAwareSchedulingConstraint);
registerSchedulingConstraint(freeSlotSchedulingConstraint);
registerSchedulingConstraint(sourceLocationAwareSchedulingConstraint);
+ registerSchedulingConstraint(nodeShareSchedulingConstraint);
}
/**
diff --git
a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
index 41edef3f7..b6ac067ed 100644
---
a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
+++
b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
@@ -28,21 +28,21 @@
@Test
public void testDisaggregationPolicy() {
final Policy disaggregationPolicy = new DisaggregationPolicy();
- assertEquals(16, disaggregationPolicy.getCompileTimePasses().size());
+ assertEquals(17, disaggregationPolicy.getCompileTimePasses().size());
assertEquals(0, disaggregationPolicy.getRuntimePasses().size());
}
@Test
public void testPadoPolicy() {
final Policy padoPolicy = new PadoPolicy();
- assertEquals(18, padoPolicy.getCompileTimePasses().size());
+ assertEquals(19, padoPolicy.getCompileTimePasses().size());
assertEquals(0, padoPolicy.getRuntimePasses().size());
}
@Test
public void testDataSkewPolicy() {
final Policy dataSkewPolicy = new DataSkewPolicy();
- assertEquals(20, dataSkewPolicy.getCompileTimePasses().size());
+ assertEquals(21, dataSkewPolicy.getCompileTimePasses().size());
assertEquals(1, dataSkewPolicy.getRuntimePasses().size());
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services