Repository: flink
Updated Branches:
  refs/heads/release-1.1 05a5f460b -> dc768d3b7


[FLINK-4875] [metrics] Use correct operator name

This closes #2676.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dc768d3b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dc768d3b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dc768d3b

Branch: refs/heads/release-1.1
Commit: dc768d3b781eb0cd0baca13746da55a0bebef115
Parents: 05a5f46
Author: zentol <[email protected]>
Authored: Thu Oct 20 15:53:03 2016 +0200
Committer: zentol <[email protected]>
Committed: Tue Oct 25 10:47:00 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/streaming/api/graph/StreamConfig.java  | 9 +++++++++
 .../streaming/api/graph/StreamingJobGraphGenerator.java     | 2 ++
 .../streaming/api/operators/AbstractStreamOperator.java     | 3 +--
 3 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dc768d3b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 783b3e2..eb31fda 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -68,6 +68,7 @@ public class StreamConfig implements Serializable {
        private static final String EDGES_IN_ORDER = "edgesInOrder";
        private static final String OUT_STREAM_EDGES = "outStreamEdges";
        private static final String IN_STREAM_EDGES = "inStreamEdges";
+       private static final String OPERATOR_NAME = "operatorName";
 
        private static final String CHECKPOINTING_ENABLED = "checkpointing";
        private static final String CHECKPOINT_MODE = "checkpointMode";
@@ -388,6 +389,14 @@ public class StreamConfig implements Serializable {
                        throw new StreamTaskException("Could not instantiate 
configuration.", e);
                }
        }
+       
+       public void setOperatorName(String name) {
+               this.config.setString(OPERATOR_NAME,name);
+       }
+       
+       public String getOperatorName() {
+               return this.config.getString(OPERATOR_NAME, null);
+       }
 
        public void setChainIndex(int index) {
                this.config.setInteger(CHAIN_INDEX, index);

http://git-wip-us.apache.org/repos/asf/flink/blob/dc768d3b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 71cc7f2..d6819e1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -213,6 +213,7 @@ public class StreamingJobGraphGenerator {
 
                                config.setChainStart();
                                config.setChainIndex(0);
+                               
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
                                config.setOutEdgesInOrder(transitiveOutEdges);
                                
config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
 
@@ -230,6 +231,7 @@ public class StreamingJobGraphGenerator {
                                        chainedConfigs.put(startNodeId, new 
HashMap<Integer, StreamConfig>());
                                }
                                config.setChainIndex(chainIndex);
+                               
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
                                
chainedConfigs.get(startNodeId).put(currentNodeId, config);
                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dc768d3b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 0269a34..d51c320 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -102,9 +102,8 @@ public abstract class AbstractStreamOperator<OUT>
        public void setup(StreamTask<?, ?> containingTask, StreamConfig config, 
Output<StreamRecord<OUT>> output) {
                this.container = containingTask;
                this.config = config;
-               String operatorName = 
containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim();
                
-               this.metrics = 
container.getEnvironment().getMetricGroup().addOperator(operatorName);
+               this.metrics = 
container.getEnvironment().getMetricGroup().addOperator(config.getOperatorName());
                this.output = new CountingOutput(output, 
this.metrics.counter("numRecordsOut"));
                this.runtimeContext = new StreamingRuntimeContext(this, 
container.getEnvironment(), container.getAccumulatorMap());
 

Reply via email to