Repository: flink Updated Branches: refs/heads/release-1.1 05a5f460b -> dc768d3b7
[FLINK-4875] [metrics] Use correct operator name This closes #2676. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dc768d3b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dc768d3b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dc768d3b Branch: refs/heads/release-1.1 Commit: dc768d3b781eb0cd0baca13746da55a0bebef115 Parents: 05a5f46 Author: zentol <[email protected]> Authored: Thu Oct 20 15:53:03 2016 +0200 Committer: zentol <[email protected]> Committed: Tue Oct 25 10:47:00 2016 +0200 ---------------------------------------------------------------------- .../org/apache/flink/streaming/api/graph/StreamConfig.java | 9 +++++++++ .../streaming/api/graph/StreamingJobGraphGenerator.java | 2 ++ .../streaming/api/operators/AbstractStreamOperator.java | 3 +-- 3 files changed, 12 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/dc768d3b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java index 783b3e2..eb31fda 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java @@ -68,6 +68,7 @@ public class StreamConfig implements Serializable { private static final String EDGES_IN_ORDER = "edgesInOrder"; private static final String OUT_STREAM_EDGES = "outStreamEdges"; private static final String IN_STREAM_EDGES = "inStreamEdges"; + private static final String OPERATOR_NAME = "operatorName"; private static final String CHECKPOINTING_ENABLED = "checkpointing"; private static final String CHECKPOINT_MODE = "checkpointMode"; @@ -388,6 +389,14 @@ public class StreamConfig implements Serializable { throw new StreamTaskException("Could not instantiate configuration.", e); } } + + public void setOperatorName(String name) { + this.config.setString(OPERATOR_NAME,name); + } + + public String getOperatorName() { + return this.config.getString(OPERATOR_NAME, null); + } public void setChainIndex(int index) { this.config.setInteger(CHAIN_INDEX, index); http://git-wip-us.apache.org/repos/asf/flink/blob/dc768d3b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 71cc7f2..d6819e1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -213,6 +213,7 @@ public class StreamingJobGraphGenerator { config.setChainStart(); config.setChainIndex(0); + config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName()); config.setOutEdgesInOrder(transitiveOutEdges); config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges()); @@ -230,6 +231,7 @@ public class StreamingJobGraphGenerator { chainedConfigs.put(startNodeId, new HashMap<Integer, StreamConfig>()); } config.setChainIndex(chainIndex); + config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName()); chainedConfigs.get(startNodeId).put(currentNodeId, config); } http://git-wip-us.apache.org/repos/asf/flink/blob/dc768d3b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 0269a34..d51c320 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -102,9 +102,8 @@ public abstract class AbstractStreamOperator<OUT> public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) { this.container = containingTask; this.config = config; - String operatorName = containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim(); - this.metrics = container.getEnvironment().getMetricGroup().addOperator(operatorName); + this.metrics = container.getEnvironment().getMetricGroup().addOperator(config.getOperatorName()); this.output = new CountingOutput(output, this.metrics.counter("numRecordsOut")); this.runtimeContext = new StreamingRuntimeContext(this, container.getEnvironment(), container.getAccumulatorMap());
