This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 2ec8f28 [NEMO-214] Use Jackson for building JSON, fix malformed JSON
issue on Windows (#118)
2ec8f28 is described below
commit 2ec8f28cf6b5c8f2c64d2d82f2cc4664f93150d0
Author: Jangho Seo <[email protected]>
AuthorDate: Tue Sep 11 13:29:31 2018 +0900
[NEMO-214] Use Jackson for building JSON, fix malformed JSON issue on
Windows (#118)
JIRA: [NEMO-214: Use Jackson for building JSON, fix malformed JSON issue on
Windows](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-214)
**Major changes:**
- Use Jackson for builidng string representation of DAG.
- Use Jackson for building string representation of ExecutionPropertyMap.
**Minor changes to note:**
- HashRange, StateMachine, TaskState, BlockState, PlanState, and
StageState: Eliminate unnecessary usage of StringBuffer and StringBuilder.
**Tests for the changes:**
- N/A
**Other comments:**
- This fixes 'malformed JSON' issue on Windows environments.
Closes #GITHUB_PR_NUMBER
---
.../java/org/apache/nemo/common/HashRange.java | 5 +-
.../java/org/apache/nemo/common/StateMachine.java | 21 ++------
.../main/java/org/apache/nemo/common/dag/DAG.java | 59 ++++++++++++----------
.../main/java/org/apache/nemo/common/dag/Edge.java | 6 ++-
.../java/org/apache/nemo/common/dag/Vertex.java | 7 ++-
.../org/apache/nemo/common/ir/edge/IREdge.java | 14 ++---
.../ir/executionproperty/ExecutionPropertyMap.java | 24 ++++-----
.../org/apache/nemo/common/ir/vertex/IRVertex.java | 15 +++---
.../apache/nemo/common/ir/vertex/LoopVertex.java | 43 +++++++---------
.../nemo/common/ir/vertex/OperatorVertex.java | 13 ++---
.../beam/source/BeamBoundedSourceVertex.java | 13 ++---
.../nemo/runtime/common/plan/RuntimeEdge.java | 13 ++---
.../org/apache/nemo/runtime/common/plan/Stage.java | 17 ++++---
.../apache/nemo/runtime/common/plan/StageEdge.java | 19 +++----
.../nemo/runtime/common/state/BlockState.java | 4 +-
.../nemo/runtime/common/state/PlanState.java | 4 +-
.../nemo/runtime/common/state/StageState.java | 4 +-
.../nemo/runtime/common/state/TaskState.java | 4 +-
.../master/resource/ExecutorRepresenter.java | 14 ++---
.../runtime/master/scheduler/BatchScheduler.java | 4 +-
20 files changed, 142 insertions(+), 161 deletions(-)
diff --git a/common/src/main/java/org/apache/nemo/common/HashRange.java
b/common/src/main/java/org/apache/nemo/common/HashRange.java
index f803268..81dd9bd 100644
--- a/common/src/main/java/org/apache/nemo/common/HashRange.java
+++ b/common/src/main/java/org/apache/nemo/common/HashRange.java
@@ -96,10 +96,7 @@ public final class HashRange implements KeyRange<Integer> {
*/
@Override
public String toString() {
- final StringBuilder printableKeyRange = new StringBuilder("[");
- printableKeyRange.append(rangeBeginInclusive()).append(",
").append(rangeEndExclusive()).append(")");
-
- return printableKeyRange.toString();
+ return String.format("[%d, %d)", rangeBeginInclusive, rangeEndExclusive());
}
@Override
diff --git a/common/src/main/java/org/apache/nemo/common/StateMachine.java
b/common/src/main/java/org/apache/nemo/common/StateMachine.java
index 19adc17..91c1ef7 100644
--- a/common/src/main/java/org/apache/nemo/common/StateMachine.java
+++ b/common/src/main/java/org/apache/nemo/common/StateMachine.java
@@ -49,15 +49,8 @@ public final class StateMachine {
*/
public synchronized void checkState(final Enum expectedCurrentState) {
if (!currentState.stateEnum.equals(expectedCurrentState)) {
- final String exceptionMessage = new StringBuilder()
- .append("The expected current state is ")
- .append(expectedCurrentState)
- .append(" but the actual state is ")
- .append(currentState).append('\n')
- .append(getPossibleTransitionsFromCurrentState())
- .toString();
-
- throw new IllegalStateException(exceptionMessage);
+ throw new IllegalStateException(String.format("The expected state is %s
but the actual state is %s\n%s",
+ expectedCurrentState, currentState,
getPossibleTransitionsFromCurrentState()));
}
}
@@ -75,14 +68,8 @@ public final class StateMachine {
final State toState = stateMap.get(state);
if (!currentState.isLegalTransition(state)) {
- final String exceptionMessage = new StringBuilder()
- .append("Illegal transition from ")
- .append(currentState)
- .append(" to ")
- .append(toState).append('\n')
- .append(getPossibleTransitionsFromCurrentState())
- .toString();
- throw new IllegalStateTransitionException(new
Exception(exceptionMessage));
+ throw new IllegalStateTransitionException(new
Exception(String.format("Illegal transition from %s to %s\n%s",
+ currentState, toState, getPossibleTransitionsFromCurrentState())));
}
currentState = toState;
diff --git a/common/src/main/java/org/apache/nemo/common/dag/DAG.java
b/common/src/main/java/org/apache/nemo/common/dag/DAG.java
index 7a642a3..506e8bc 100644
--- a/common/src/main/java/org/apache/nemo/common/dag/DAG.java
+++ b/common/src/main/java/org/apache/nemo/common/dag/DAG.java
@@ -15,6 +15,9 @@
*/
package org.apache.nemo.common.dag;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.nemo.common.exception.IllegalEdgeOperationException;
import org.apache.nemo.common.exception.IllegalVertexOperationException;
import org.apache.nemo.common.ir.vertex.LoopVertex;
@@ -349,36 +352,40 @@ public final class DAG<V extends Vertex, E extends
Edge<V>> implements Serializa
return this.loopStackDepthMap.get(v.getId());
}
- @Override
- public String toString() {
- final StringBuilder sb = new StringBuilder();
- sb.append("{\"vertices\": [");
- boolean isFirstVertex = true;
+ /**
+ * @return {@link com.fasterxml.jackson.databind.JsonNode} for this DAG.
+ */
+ public ObjectNode asJsonNode() {
+ final ObjectMapper mapper = new ObjectMapper();
+ final ObjectNode node = mapper.createObjectNode();
+
+ final ArrayNode verticesNode = mapper.createArrayNode();
for (final V vertex : vertices) {
- if (!isFirstVertex) {
- sb.append(", ");
- }
- isFirstVertex = false;
- sb.append("{\"id\": \"").append(vertex.getId());
- sb.append("\", \"properties\": ").append(vertex.propertiesToJSON());
- sb.append("}");
+ final ObjectNode vertexNode = mapper.createObjectNode();
+ vertexNode.put("id", vertex.getId());
+ vertexNode.set("properties", vertex.getPropertiesAsJsonNode());
+ verticesNode.add(vertexNode);
}
- sb.append("], \"edges\": [");
- boolean isFirstEdge = true;
- for (final List<E> edgeList : incomingEdges.values()) {
- for (final E edge : edgeList) {
- if (!isFirstEdge) {
- sb.append(", ");
- }
- isFirstEdge = false;
- sb.append("{\"src\": \"").append(edge.getSrc().getId());
- sb.append("\", \"dst\": \"").append(edge.getDst().getId());
- sb.append("\", \"properties\": ").append(edge.propertiesToJSON());
- sb.append("}");
+ node.set("vertices", verticesNode);
+
+ final ArrayNode edgesNode = mapper.createArrayNode();
+ for (final List<E> edges : incomingEdges.values()) {
+ for (final E edge : edges) {
+ final ObjectNode edgeNode = mapper.createObjectNode();
+ edgeNode.put("src", edge.getSrc().getId());
+ edgeNode.put("dst", edge.getDst().getId());
+ edgeNode.set("properties", edge.getPropertiesAsJsonNode());
+ edgesNode.add(edgeNode);
}
+
}
- sb.append("]}");
- return sb.toString();
+ node.set("edges", edgesNode);
+ return node;
+ }
+
+ @Override
+ public String toString() {
+ return asJsonNode().toString();
}
public static final String EMPTY_DAG_DIRECTORY = "";
diff --git a/common/src/main/java/org/apache/nemo/common/dag/Edge.java
b/common/src/main/java/org/apache/nemo/common/dag/Edge.java
index 9506d40..298568d 100644
--- a/common/src/main/java/org/apache/nemo/common/dag/Edge.java
+++ b/common/src/main/java/org/apache/nemo/common/dag/Edge.java
@@ -17,6 +17,8 @@ package org.apache.nemo.common.dag;
import java.io.Serializable;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,7 +79,7 @@ public class Edge<V extends Vertex> implements Serializable {
* @return JSON representation of additional properties
*/
@SuppressWarnings("checkstyle:designforextension")
- public String propertiesToJSON() {
- return "{}";
+ public JsonNode getPropertiesAsJsonNode() {
+ return JsonNodeFactory.instance.objectNode();
}
}
diff --git a/common/src/main/java/org/apache/nemo/common/dag/Vertex.java
b/common/src/main/java/org/apache/nemo/common/dag/Vertex.java
index 9a40740..ade35df 100644
--- a/common/src/main/java/org/apache/nemo/common/dag/Vertex.java
+++ b/common/src/main/java/org/apache/nemo/common/dag/Vertex.java
@@ -15,6 +15,9 @@
*/
package org.apache.nemo.common.dag;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+
import java.io.Serializable;
/**
@@ -47,7 +50,7 @@ public abstract class Vertex implements Serializable {
* @return JSON representation of additional properties
*/
@SuppressWarnings("checkstyle:designforextension")
- public String propertiesToJSON() {
- return "{}";
+ public JsonNode getPropertiesAsJsonNode() {
+ return JsonNodeFactory.instance.objectNode();
}
}
diff --git a/common/src/main/java/org/apache/nemo/common/ir/edge/IREdge.java
b/common/src/main/java/org/apache/nemo/common/ir/edge/IREdge.java
index 7e2f401..9810ee4 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/edge/IREdge.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/edge/IREdge.java
@@ -15,6 +15,8 @@
*/
package org.apache.nemo.common.ir.edge;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.nemo.common.dag.Edge;
import org.apache.nemo.common.ir.IdManager;
import
org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
@@ -127,11 +129,11 @@ public final class IREdge extends Edge<IRVertex> {
}
@Override
- public String propertiesToJSON() {
- final StringBuilder sb = new StringBuilder();
- sb.append("{\"id\": \"").append(getId());
- sb.append("\", \"executionProperties\": ").append(executionProperties);
- sb.append("}");
- return sb.toString();
+ public ObjectNode getPropertiesAsJsonNode() {
+ final ObjectMapper mapper = new ObjectMapper();
+ final ObjectNode node = mapper.createObjectNode();
+ node.put("id", getId());
+ node.set("executionProperties", executionProperties.asJsonNode());
+ return node;
}
}
diff --git
a/common/src/main/java/org/apache/nemo/common/ir/executionproperty/ExecutionPropertyMap.java
b/common/src/main/java/org/apache/nemo/common/ir/executionproperty/ExecutionPropertyMap.java
index ac62aa6..89700c4 100644
---
a/common/src/main/java/org/apache/nemo/common/ir/executionproperty/ExecutionPropertyMap.java
+++
b/common/src/main/java/org/apache/nemo/common/ir/executionproperty/ExecutionPropertyMap.java
@@ -15,6 +15,8 @@
*/
package org.apache.nemo.common.ir.executionproperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.nemo.common.coder.DecoderFactory;
import org.apache.nemo.common.coder.EncoderFactory;
import org.apache.nemo.common.exception.CompileTimeOptimizationException;
@@ -186,22 +188,16 @@ public final class ExecutionPropertyMap<T extends
ExecutionProperty> implements
@Override
public String toString() {
- final StringBuilder sb = new StringBuilder();
- sb.append("{");
- boolean isFirstPair = true;
+ return asJsonNode().toString();
+ }
+
+ public ObjectNode asJsonNode() {
+ final ObjectMapper mapper = new ObjectMapper();
+ final ObjectNode node = mapper.createObjectNode();
for (final Map.Entry<Class<? extends ExecutionProperty>, T> entry :
properties.entrySet()) {
- if (!isFirstPair) {
- sb.append(", ");
- }
- isFirstPair = false;
- sb.append("\"");
- sb.append(entry.getKey().getCanonicalName());
- sb.append("\": \"");
- sb.append(entry.getValue().getValue());
- sb.append("\"");
+ node.put(entry.getKey().getCanonicalName(),
entry.getValue().getValue().toString());
}
- sb.append("}");
- return sb.toString();
+ return node;
}
// Apache commons-lang 3 Equals/HashCodeBuilder template.
diff --git
a/common/src/main/java/org/apache/nemo/common/ir/vertex/IRVertex.java
b/common/src/main/java/org/apache/nemo/common/ir/vertex/IRVertex.java
index 69aba94..12c3756 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/IRVertex.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/IRVertex.java
@@ -15,6 +15,8 @@
*/
package org.apache.nemo.common.ir.vertex;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.nemo.common.ir.IdManager;
import org.apache.nemo.common.ir.executionproperty.ExecutionPropertyMap;
import org.apache.nemo.common.dag.Vertex;
@@ -109,12 +111,13 @@ public abstract class IRVertex extends Vertex implements
Cloneable<IRVertex> {
}
/**
- * @return IRVertex properties in String form.
+ * @return IRVertex properties as JSON node.
*/
- protected final String irVertexPropertiesToString() {
- final StringBuilder sb = new StringBuilder();
- sb.append("\"class\": \"").append(this.getClass().getSimpleName());
- sb.append("\", \"executionProperties\": ").append(executionProperties);
- return sb.toString();
+ protected final ObjectNode getIRVertexPropertiesAsJsonNode() {
+ final ObjectMapper mapper = new ObjectMapper();
+ final ObjectNode node = mapper.createObjectNode();
+ node.put("class", getClass().getSimpleName());
+ node.set("executionProperties", executionProperties.asJsonNode());
+ return node;
}
}
diff --git
a/common/src/main/java/org/apache/nemo/common/ir/vertex/LoopVertex.java
b/common/src/main/java/org/apache/nemo/common/ir/vertex/LoopVertex.java
index 67e2fe1..ba6617f 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/LoopVertex.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/LoopVertex.java
@@ -15,6 +15,9 @@
*/
package org.apache.nemo.common.ir.vertex;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.nemo.common.dag.DAG;
import org.apache.nemo.common.dag.DAGBuilder;
import org.apache.nemo.common.ir.edge.IREdge;
@@ -25,7 +28,6 @@ import
org.apache.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProper
import java.io.Serializable;
import java.util.*;
import java.util.function.IntPredicate;
-import java.util.stream.Collectors;
/**
* IRVertex that contains a partial DAG that is iterative.
@@ -313,22 +315,16 @@ public final class LoopVertex extends IRVertex {
}
@Override
- public String propertiesToJSON() {
- final List<String> edgeMappings =
edgeWithLoopToEdgeWithInternalVertex.entrySet().stream()
- .map(entry -> String.format("\"%s\": \"%s\"", entry.getKey().getId(),
entry.getValue().getId()))
- .collect(Collectors.toList());
- final StringBuilder sb = new StringBuilder();
- sb.append("{");
- sb.append(irVertexPropertiesToString());
- sb.append(", \"remainingIteration\": ");
- sb.append(this.maxNumberOfIterations);
- sb.append(", \"DAG\": ");
- sb.append(getDAG());
- sb.append(", \"dagIncomingEdges\":
").append(crossingEdgesToJSON(dagIncomingEdges));
- sb.append(", \"dagOutgoingEdges\":
").append(crossingEdgesToJSON(dagOutgoingEdges));
- sb.append(", \"edgeWithLoopToEdgeWithInternalVertex\":
{").append(String.join(", ", edgeMappings));
- sb.append("}}");
- return sb.toString();
+ public ObjectNode getPropertiesAsJsonNode() {
+ final ObjectNode node = getIRVertexPropertiesAsJsonNode();
+ node.put("remainingIteration", maxNumberOfIterations);
+ node.set("DAG", getDAG().asJsonNode());
+ node.set("dagIncomingEdges", crossingEdgesToJSON(dagIncomingEdges));
+ node.set("dagOutgoingEdges", crossingEdgesToJSON(dagOutgoingEdges));
+ final ObjectNode edgeMappings =
node.putObject("edgeWithLoopToEdgeWithInternalVertex");
+ edgeWithLoopToEdgeWithInternalVertex.entrySet()
+ .forEach(entry -> edgeMappings.put(entry.getKey().getId(),
entry.getValue().getId()));
+ return node;
}
/**
@@ -336,15 +332,12 @@ public final class LoopVertex extends IRVertex {
* @param map map of the crossing edges.
* @return a string of JSON showing the crossing edges.
*/
- private static String crossingEdgesToJSON(final Map<IRVertex, Set<IREdge>>
map) {
- final ArrayList<String> vertices = new ArrayList<>();
+ private static ObjectNode crossingEdgesToJSON(final Map<IRVertex,
Set<IREdge>> map) {
+ final ObjectNode node = JsonNodeFactory.instance.objectNode();
map.forEach(((irVertex, irEdges) -> {
- final StringBuilder sb = new StringBuilder();
- sb.append("\"").append(irVertex.getId()).append("\": [");
- final List<String> edges = irEdges.stream().map(e -> "\"" + e.getId() +
"\"").collect(Collectors.toList());
- sb.append(String.join(", ", edges)).append("]");
- vertices.add(sb.toString());
+ final ArrayNode vertexNode = node.putArray(irVertex.getId());
+ irEdges.forEach(e -> vertexNode.add(e.getId()));
}));
- return "{" + String.join(", ", vertices) + "}";
+ return node;
}
}
diff --git
a/common/src/main/java/org/apache/nemo/common/ir/vertex/OperatorVertex.java
b/common/src/main/java/org/apache/nemo/common/ir/vertex/OperatorVertex.java
index ab28c90..8f932e2 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/OperatorVertex.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/OperatorVertex.java
@@ -15,6 +15,7 @@
*/
package org.apache.nemo.common.ir.vertex;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.nemo.common.ir.vertex.transform.Transform;
/**
@@ -55,13 +56,9 @@ public final class OperatorVertex extends IRVertex {
}
@Override
- public String propertiesToJSON() {
- final StringBuilder sb = new StringBuilder();
- sb.append("{");
- sb.append(irVertexPropertiesToString());
- sb.append(", \"transform\": \"");
- sb.append(transform);
- sb.append("\"}");
- return sb.toString();
+ public ObjectNode getPropertiesAsJsonNode() {
+ final ObjectNode node = getIRVertexPropertiesAsJsonNode();
+ node.put("transform", transform.toString());
+ return node;
}
}
diff --git
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
index 0472e1c..f17ab98 100644
---
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
+++
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
@@ -15,6 +15,7 @@
*/
package org.apache.nemo.compiler.frontend.beam.source;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.nemo.common.ir.Readable;
import java.io.IOException;
@@ -82,14 +83,10 @@ public final class BeamBoundedSourceVertex<O> extends
SourceVertex<O> {
}
@Override
- public String propertiesToJSON() {
- final StringBuilder sb = new StringBuilder();
- sb.append("{");
- sb.append(irVertexPropertiesToString());
- sb.append(", \"source\": \"");
- sb.append(sourceDescription);
- sb.append("\"}");
- return sb.toString();
+ public ObjectNode getPropertiesAsJsonNode() {
+ final ObjectNode node = getIRVertexPropertiesAsJsonNode();
+ node.put("source", sourceDescription);
+ return node;
}
/**
diff --git
a/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/RuntimeEdge.java
b/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/RuntimeEdge.java
index 695ce90..a87966f 100644
---
a/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/RuntimeEdge.java
+++
b/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/RuntimeEdge.java
@@ -15,6 +15,8 @@
*/
package org.apache.nemo.runtime.common.plan;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.nemo.common.dag.Edge;
import org.apache.nemo.common.dag.Vertex;
import org.apache.nemo.common.ir.executionproperty.EdgeExecutionProperty;
@@ -70,11 +72,10 @@ public class RuntimeEdge<V extends Vertex> extends Edge<V> {
*/
@Override
@SuppressWarnings("checkstyle:designforextension")
- public String propertiesToJSON() {
- final StringBuilder sb = new StringBuilder();
- sb.append("{\"runtimeEdgeId\": \"").append(getId());
- sb.append("\", \"executionProperties\": ").append(executionProperties);
- sb.append("}");
- return sb.toString();
+ public ObjectNode getPropertiesAsJsonNode() {
+ final ObjectNode node = JsonNodeFactory.instance.objectNode();
+ node.put("runtimeEdgeId", getId());
+ node.set("executionProperties", executionProperties.asJsonNode());
+ return node;
}
}
diff --git
a/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/Stage.java
b/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/Stage.java
index 603bb54..54048b9 100644
---
a/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/Stage.java
+++
b/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/Stage.java
@@ -15,6 +15,8 @@
*/
package org.apache.nemo.runtime.common.plan;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.nemo.common.dag.DAG;
import org.apache.nemo.common.dag.Vertex;
import org.apache.nemo.common.ir.Readable;
@@ -115,13 +117,12 @@ public final class Stage extends Vertex {
}
@Override
- public String propertiesToJSON() {
- final StringBuilder sb = new StringBuilder();
- sb.append("{\"scheduleGroup\": ").append(getScheduleGroup());
- sb.append(", \"irDag\": ").append(irDag);
- sb.append(", \"parallelism\": ").append(getParallelism());
- sb.append(", \"executionProperties\": ").append(executionProperties);
- sb.append('}');
- return sb.toString();
+ public ObjectNode getPropertiesAsJsonNode() {
+ final ObjectNode node = JsonNodeFactory.instance.objectNode();
+ node.put("scheduleGroup", getScheduleGroup());
+ node.set("irDag", irDag.asJsonNode());
+ node.put("parallelism", getParallelism());
+ node.set("executionProperties", executionProperties.asJsonNode());
+ return node;
}
}
diff --git
a/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/StageEdge.java
b/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/StageEdge.java
index 596d922..f776309 100644
---
a/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/StageEdge.java
+++
b/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/StageEdge.java
@@ -15,6 +15,8 @@
*/
package org.apache.nemo.runtime.common.plan;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import org.apache.nemo.common.HashRange;
import org.apache.nemo.common.KeyRange;
@@ -108,19 +110,18 @@ public final class StageEdge extends RuntimeEdge<Stage> {
}
@Override
- public String propertiesToJSON() {
- final StringBuilder sb = new StringBuilder();
- sb.append("{\"runtimeEdgeId\": \"").append(getId());
- sb.append("\", \"executionProperties\":
").append(getExecutionProperties());
- sb.append(", \"externalSrcVertexId\": \"").append(srcVertex.getId());
- sb.append("\", \"externalDstVertexId\": \"").append(dstVertex.getId());
- sb.append("\"}");
- return sb.toString();
+ public ObjectNode getPropertiesAsJsonNode() {
+ final ObjectNode node = JsonNodeFactory.instance.objectNode();
+ node.put("runtimeEdgeId", getId());
+ node.set("executionProperties", getExecutionProperties().asJsonNode());
+ node.put("externalSrcVertexId", srcVertex.getId());
+ node.put("externalDstVertexId", dstVertex.getId());
+ return node;
}
@Override
public String toString() {
- return propertiesToJSON();
+ return getPropertiesAsJsonNode().toString();
}
/**
diff --git
a/runtime/common/src/main/java/org/apache/nemo/runtime/common/state/BlockState.java
b/runtime/common/src/main/java/org/apache/nemo/runtime/common/state/BlockState.java
index c4c8d17..e9a5660 100644
---
a/runtime/common/src/main/java/org/apache/nemo/runtime/common/state/BlockState.java
+++
b/runtime/common/src/main/java/org/apache/nemo/runtime/common/state/BlockState.java
@@ -63,8 +63,6 @@ public final class BlockState {
@Override
public String toString() {
- final StringBuffer sb = new StringBuffer();
- sb.append(stateMachine.getCurrentState());
- return sb.toString();
+ return stateMachine.getCurrentState().toString();
}
}
diff --git
a/runtime/common/src/main/java/org/apache/nemo/runtime/common/state/PlanState.java
b/runtime/common/src/main/java/org/apache/nemo/runtime/common/state/PlanState.java
index 9d8dc44..b621ba6 100644
---
a/runtime/common/src/main/java/org/apache/nemo/runtime/common/state/PlanState.java
+++
b/runtime/common/src/main/java/org/apache/nemo/runtime/common/state/PlanState.java
@@ -65,8 +65,6 @@ public final class PlanState {
@Override
public String toString() {
- final StringBuffer sb = new StringBuffer();
- sb.append(stateMachine.getCurrentState());
- return sb.toString();
+ return stateMachine.getCurrentState().toString();
}
}
diff --git
a/runtime/common/src/main/java/org/apache/nemo/runtime/common/state/StageState.java
b/runtime/common/src/main/java/org/apache/nemo/runtime/common/state/StageState.java
index 580a667..ec2ef69 100644
---
a/runtime/common/src/main/java/org/apache/nemo/runtime/common/state/StageState.java
+++
b/runtime/common/src/main/java/org/apache/nemo/runtime/common/state/StageState.java
@@ -66,8 +66,6 @@ public final class StageState {
@Override
public String toString() {
- final StringBuffer sb = new StringBuffer();
- sb.append(stateMachine.getCurrentState());
- return sb.toString();
+ return stateMachine.getCurrentState().toString();
}
}
diff --git
a/runtime/common/src/main/java/org/apache/nemo/runtime/common/state/TaskState.java
b/runtime/common/src/main/java/org/apache/nemo/runtime/common/state/TaskState.java
index d1f7c90..6047ac2 100644
---
a/runtime/common/src/main/java/org/apache/nemo/runtime/common/state/TaskState.java
+++
b/runtime/common/src/main/java/org/apache/nemo/runtime/common/state/TaskState.java
@@ -89,8 +89,6 @@ public final class TaskState {
@Override
public String toString() {
- final StringBuffer sb = new StringBuffer();
- sb.append(stateMachine.getCurrentState());
- return sb.toString();
+ return stateMachine.getCurrentState().toString();
}
}
diff --git
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ExecutorRepresenter.java
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ExecutorRepresenter.java
index 4833189..a1f49c4 100644
---
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ExecutorRepresenter.java
+++
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ExecutorRepresenter.java
@@ -15,6 +15,8 @@
*/
package org.apache.nemo.runtime.master.resource;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.protobuf.ByteString;
import org.apache.nemo.common.ir.vertex.executionproperty.ResourceSlotProperty;
import org.apache.nemo.runtime.common.RuntimeIdManager;
@@ -215,12 +217,12 @@ public final class ExecutorRepresenter {
@Override
public String toString() {
- final StringBuffer sb = new StringBuffer("ExecutorRepresenter{");
- sb.append("executorId='").append(executorId).append('\'');
- sb.append(", runningTasks=").append(getRunningTasks());
- sb.append(", failedTasks=").append(failedTasks);
- sb.append('}');
- return sb.toString();
+ final ObjectMapper mapper = new ObjectMapper();
+ final ObjectNode node = mapper.createObjectNode();
+ node.put("executorId", executorId);
+ node.put("runningTasks", getRunningTasks().toString());
+ node.put("failedTasks", failedTasks.toString());
+ return node.toString();
}
/**
diff --git
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java
index 152175b..a9c4d39 100644
---
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java
+++
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java
@@ -189,8 +189,8 @@ public final class BatchScheduler implements Scheduler {
onTaskExecutionOnHold(executorId, taskId);
break;
case FAILED:
- throw new UnrecoverableFailureException(new Exception(new
StringBuffer().append("The plan failed on Task #")
- .append(taskId).append(" in Executor
").append(executorId).toString()));
+ throw new UnrecoverableFailureException(new
Exception(String.format("The plan failed on %s in %s",
+ taskId, executorId)));
case READY:
case EXECUTING:
throw new RuntimeException("The states READY/EXECUTING cannot occur at
this point");