Repository: flink
Updated Branches:
  refs/heads/master 227cdc829 -> b0753f193


[FLINK-4875] [metrics] Use correct operator name

This closes #2676.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b0753f19
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b0753f19
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b0753f19

Branch: refs/heads/master
Commit: b0753f193cb7c8448547e254326911166d7b96a2
Parents: 227cdc8
Author: zentol <[email protected]>
Authored: Thu Oct 20 15:53:03 2016 +0200
Committer: zentol <[email protected]>
Committed: Sat Oct 22 11:13:28 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/streaming/api/graph/StreamConfig.java  | 9 +++++++++
 .../streaming/api/graph/StreamingJobGraphGenerator.java     | 2 ++
 .../streaming/api/operators/AbstractStreamOperator.java     | 3 +--
 3 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b0753f19/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 0dd1b37..ffe8456 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -68,6 +68,7 @@ public class StreamConfig implements Serializable {
        private static final String EDGES_IN_ORDER = "edgesInOrder";
        private static final String OUT_STREAM_EDGES = "outStreamEdges";
        private static final String IN_STREAM_EDGES = "inStreamEdges";
+       private static final String OPERATOR_NAME = "operatorName";
 
        private static final String CHECKPOINTING_ENABLED = "checkpointing";
        private static final String CHECKPOINT_MODE = "checkpointMode";
@@ -390,6 +391,14 @@ public class StreamConfig implements Serializable {
                        throw new StreamTaskException("Could not instantiate 
configuration.", e);
                }
        }
+       
+       public void setOperatorName(String name) {
+               this.config.setString(OPERATOR_NAME,name);
+       }
+       
+       public String getOperatorName() {
+               return this.config.getString(OPERATOR_NAME, null);
+       }
 
        public void setChainIndex(int index) {
                this.config.setInteger(CHAIN_INDEX, index);

http://git-wip-us.apache.org/repos/asf/flink/blob/b0753f19/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 824e375..1d99cf3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -211,6 +211,7 @@ public class StreamingJobGraphGenerator {
 
                                config.setChainStart();
                                config.setChainIndex(0);
+                               
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
                                config.setOutEdgesInOrder(transitiveOutEdges);
                                
config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
 
@@ -228,6 +229,7 @@ public class StreamingJobGraphGenerator {
                                        chainedConfigs.put(startNodeId, new 
HashMap<Integer, StreamConfig>());
                                }
                                config.setChainIndex(chainIndex);
+                               
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
                                
chainedConfigs.get(startNodeId).put(currentNodeId, config);
                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b0753f19/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 82ce493..f2da9da 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -157,9 +157,8 @@ public abstract class AbstractStreamOperator<OUT>
        public void setup(StreamTask<?, ?> containingTask, StreamConfig config, 
Output<StreamRecord<OUT>> output) {
                this.container = containingTask;
                this.config = config;
-               String operatorName = 
containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim();
                
-               this.metrics = 
container.getEnvironment().getMetricGroup().addOperator(operatorName);
+               this.metrics = 
container.getEnvironment().getMetricGroup().addOperator(config.getOperatorName());
                this.output = new CountingOutput(output, 
this.metrics.counter("numRecordsOut"));
                Configuration taskManagerConfig = 
container.getEnvironment().getTaskManagerInfo().getConfiguration();
                int historySize = 
taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, 
ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE);

Reply via email to