[FLINK-4733] Reuse operator IO metrics for task

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1f4f6f92
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1f4f6f92
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1f4f6f92

Branch: refs/heads/master
Commit: 1f4f6f928cb71fcbcba8d5aca731e738352d1175
Parents: 99f1dc3
Author: zentol <[email protected]>
Authored: Mon Oct 31 14:08:26 2016 +0100
Committer: zentol <[email protected]>
Committed: Mon Oct 31 15:12:02 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/IOMetrics.java | 99 ++++++++++++++++++++
 .../metrics/groups/OperatorIOMetricGroup.java   | 17 ++++
 .../metrics/groups/TaskIOMetricGroup.java       | 67 ++++++++++++-
 .../flink/runtime/operators/BatchTask.java      |  8 ++
 .../flink/runtime/operators/DataSinkTask.java   |  2 +
 .../flink/runtime/operators/DataSourceTask.java |  4 +
 .../operators/chaining/ChainedDriver.java       |  4 +
 .../metrics/groups/TaskIOMetricGroupTest.java   | 61 ++++++++++++
 .../flink/streaming/api/graph/StreamConfig.java |  9 ++
 .../api/graph/StreamingJobGraphGenerator.java   |  3 +
 .../api/operators/AbstractStreamOperator.java   |  6 ++
 11 files changed, 278 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
new file mode 100644
index 0000000..15c54b4
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.metrics.Meter;
+
+import java.io.Serializable;
+
+/**
+ * An instance of this class represents a snapshot of the io-related metrics 
of a single task.
+ */
+public class IOMetrics implements Serializable {
+       private static final long serialVersionUID = -7208093607556457183L;
+       private final long numRecordsIn;
+       private final long numRecordsOut;
+
+       private final double numRecordsInPerSecond;
+       private final double numRecordsOutPerSecond;
+
+       private final long numBytesInLocal;
+       private final long numBytesInRemote;
+       private final long numBytesOut;
+
+       private final double numBytesInLocalPerSecond;
+       private final double numBytesInRemotePerSecond;
+       private final double numBytesOutPerSecond;
+
+       public IOMetrics(Meter recordsIn, Meter recordsOut, Meter bytesLocalIn, 
Meter bytesRemoteIn, Meter bytesOut) {
+               this.numRecordsIn = recordsIn.getCount();
+               this.numRecordsInPerSecond = recordsIn.getRate();
+               this.numRecordsOut = recordsOut.getCount();
+               this.numRecordsOutPerSecond = recordsOut.getRate();
+               this.numBytesInLocal = bytesLocalIn.getCount();
+               this.numBytesInLocalPerSecond = bytesLocalIn.getRate();
+               this.numBytesInRemote = bytesRemoteIn.getCount();
+               this.numBytesInRemotePerSecond = bytesRemoteIn.getRate();
+               this.numBytesOut = bytesOut.getCount();
+               this.numBytesOutPerSecond = bytesOut.getRate();
+       }
+
+       public long getNumRecordsIn() {
+               return numRecordsIn;
+       }
+
+       public long getNumRecordsOut() {
+               return numRecordsOut;
+       }
+
+       public long getNumBytesInLocal() {
+               return numBytesInLocal;
+       }
+
+       public long getNumBytesInRemote() {
+               return numBytesInRemote;
+       }
+
+       public long getNumBytesInTotal() {
+               return numBytesInLocal + numBytesInRemote;
+       }
+
+       public long getNumBytesOut() {
+               return numBytesOut;
+       }
+
+       public double getNumRecordsInPerSecond() {
+               return numRecordsInPerSecond;
+       }
+
+       public double getNumRecordsOutPerSecond() {
+               return numRecordsOutPerSecond;
+       }
+
+       public double getNumBytesInLocalPerSecond() {
+               return numBytesInLocalPerSecond;
+       }
+
+       public double getNumBytesInRemotePerSecond() {
+               return numBytesInRemotePerSecond;
+       }
+
+       public double getNumBytesOutPerSecond() {
+               return numBytesOutPerSecond;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
index 32611fd..2e321fe 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
@@ -56,4 +56,21 @@ public class OperatorIOMetricGroup extends 
ProxyMetricGroup<OperatorMetricGroup>
        public Meter getNumRecordsOutRate() {
                return numRecordsOutRate;
        }
+
+       /**
+        * Causes the containing task to use this operators input record 
counter.
+        */
+       public void reuseInputMetricsForTask() {
+               TaskIOMetricGroup taskIO = 
parentMetricGroup.parent().getIOMetricGroup();
+               taskIO.reuseRecordsInputCounter(this.numRecordsIn);
+               
+       }
+
+       /**
+        * Causes the containing task to use this operators output record 
counter.
+        */
+       public void reuseOutputMetricsForTask() {
+               TaskIOMetricGroup taskIO = 
parentMetricGroup.parent().getIOMetricGroup();
+               taskIO.reuseRecordsOutputCounter(this.numRecordsOut);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index b2884ec..4e32563 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -23,9 +23,14 @@ import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MeterView;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * Metric group that contains shareable pre-defined IO-related metrics. The 
metrics registration is
@@ -36,10 +41,14 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
        private final Counter numBytesOut;
        private final Counter numBytesInLocal;
        private final Counter numBytesInRemote;
+       private final SumCounter numRecordsIn;
+       private final SumCounter numRecordsOut;
 
        private final Meter numBytesInRateLocal;
        private final Meter numBytesInRateRemote;
        private final Meter numBytesOutRate;
+       private final Meter numRecordsInRate;
+       private final Meter numRecordsOutRate;
 
        private final MetricGroup buffers;
 
@@ -52,10 +61,21 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
                this.numBytesOutRate = meter("numBytesOutPerSecond", new 
MeterView(numBytesOut, 60));
                this.numBytesInRateLocal = meter("numBytesInLocalPerSecond", 
new MeterView(numBytesInLocal, 60));
                this.numBytesInRateRemote = meter("numBytesInRemotePerSecond", 
new MeterView(numBytesInRemote, 60));
+               this.numRecordsIn = counter("numRecordsIn", new SumCounter());
+               this.numRecordsOut = counter("numRecordsOut", new SumCounter());
+               this.numRecordsInRate = meter("numRecordsInPerSecond", new 
MeterView(numRecordsIn, 60));
+               this.numRecordsOutRate = meter("numRecordsOutPerSecond", new 
MeterView(numRecordsOut, 60));
 
                this.buffers = addGroup("buffers");
        }
 
+       public IOMetrics createSnapshot() {
+               return new IOMetrics(numRecordsInRate, numRecordsOutRate, 
numBytesInRateLocal, numBytesInRateRemote, numBytesOutRate);
+       }
+
+       // 
============================================================================================
+       // Getters
+       // 
============================================================================================
        public Counter getNumBytesOutCounter() {
                return numBytesOut;
        }
@@ -68,11 +88,19 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
                return numBytesInRemote;
        }
 
-       public Meter getNumBytesInRateLocalMeter() {
+       public Counter getNumRecordsInCounter() {
+               return numRecordsIn;
+       }
+
+       public Counter getNumRecordsOutCounter() {
+               return numRecordsOut;
+       }
+
+       public Meter getNumBytesInLocalRateMeter() {
                return numBytesInRateLocal;
        }
 
-       public Meter getNumBytesInRateRemoteMeter() {
+       public Meter getNumBytesInRemoteRateMeter() {
                return numBytesInRateRemote;
        }
 
@@ -134,6 +162,41 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
                }
        }
 
+       // 
============================================================================================
+       // Metric Reuse
+       // 
============================================================================================
+       public void reuseRecordsInputCounter(Counter numRecordsInCounter) {
+               this.numRecordsIn.addCounter(numRecordsInCounter);
+       }
+
+       public void reuseRecordsOutputCounter(Counter numRecordsOutCounter) {
+               this.numRecordsOut.addCounter(numRecordsOutCounter);
+       }
+
+       /**
+        * A {@link SimpleCounter} that can contain other {@link Counter}s. A 
call to {@link SumCounter#getCount()} returns
+        * the sum of this counters and all contained counters.
+        */
+       private static class SumCounter extends SimpleCounter {
+               private final List<Counter> internalCounters = new 
ArrayList<>();
+
+               SumCounter() {
+               }
+
+               public void addCounter(Counter toAdd) {
+                       internalCounters.add(toAdd);
+               }
+
+               @Override
+               public long getCount() {
+                       long sum = super.getCount();
+                       for (Counter counter : internalCounters) {
+                               sum += counter.getCount();
+                       }
+                       return sum;
+               }
+       }
+
        /**
         * Input buffer pool usage gauge of a task
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
index 354dbac..e896639 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
@@ -243,6 +243,10 @@ public class BatchTask<S extends Function, OT> extends 
AbstractInvokable impleme
                String headName =  
getEnvironment().getTaskInfo().getTaskName().split("->")[0].trim();
                this.metrics = getEnvironment().getMetricGroup()
                        .addOperator(headName.startsWith("CHAIN") ? 
headName.substring(6) : headName);
+               this.metrics.getIOMetricGroup().reuseInputMetricsForTask();
+               if (config.getNumberOfChainedStubs() == 0) {
+                       
this.metrics.getIOMetricGroup().reuseOutputMetricsForTask();
+               }
 
                // initialize the readers.
                // this does not yet trigger any stream consuming or processing.
@@ -1306,6 +1310,10 @@ public class BatchTask<S extends Function, OT> extends 
AbstractInvokable impleme
                                ct.setup(chainedStubConf, taskName, previous, 
containingTask, cl, executionConfig, accumulatorMap);
                                chainedTasksTarget.add(0, ct);
 
+                               if (i == numChained - 1) {
+                                       
ct.getIOMetrics().reuseOutputMetricsForTask();
+                               }
+
                                previous = ct;
                        }
                        // the collector of the first in the chain is the 
collector for the task

http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index 4626b69..eb7999c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -109,6 +109,8 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 
                RuntimeContext ctx = createRuntimeContext();
                final Counter numRecordsIn = ((OperatorMetricGroup) 
ctx.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
+               ((OperatorMetricGroup) 
ctx.getMetricGroup()).getIOMetricGroup().reuseInputMetricsForTask();
+               ((OperatorMetricGroup) 
ctx.getMetricGroup()).getIOMetricGroup().reuseOutputMetricsForTask();
                
                
if(RichOutputFormat.class.isAssignableFrom(this.format.getClass())){
                        ((RichOutputFormat) this.format).setRuntimeContext(ctx);

http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index 4dc3ef5..e89559d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -103,7 +103,11 @@ public class DataSourceTask<OT> extends AbstractInvokable {
 
                RuntimeContext ctx = createRuntimeContext();
                Counter completedSplitsCounter = 
ctx.getMetricGroup().counter("numSplitsProcessed");
+               ((OperatorMetricGroup) 
ctx.getMetricGroup()).getIOMetricGroup().reuseInputMetricsForTask();
                Counter numRecordsOut = ((OperatorMetricGroup) 
ctx.getMetricGroup()).getIOMetricGroup().getNumRecordsOutCounter();
+               if (this.config.getNumberOfChainedStubs() == 0) {
+                       ((OperatorMetricGroup) 
ctx.getMetricGroup()).getIOMetricGroup().reuseOutputMetricsForTask();
+               }
 
                if 
(RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
                        ((RichInputFormat) this.format).setRuntimeContext(ctx);

http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
index cf62dfa..442a53c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
@@ -104,6 +105,9 @@ public abstract class ChainedDriver<IT, OT> implements 
Collector<IT> {
        @Override
        public abstract void collect(IT record);
 
+       public OperatorIOMetricGroup getIOMetrics() {
+               return this.metrics.getIOMetricGroup();
+       }
        
        protected RuntimeContext getUdfRuntimeContext() {
                return this.udfContext;

http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
new file mode 100644
index 0000000..564a518
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class TaskIOMetricGroupTest {
+       @Test
+       public void testTaskIOMetricGroup() {
+               TaskMetricGroup task = new UnregisteredTaskMetricsGroup();
+               TaskIOMetricGroup taskIO = task.getIOMetricGroup();
+
+               // test counter forwarding
+               assertNotNull(taskIO.getNumRecordsInCounter());
+               assertNotNull(taskIO.getNumRecordsOutCounter());
+
+               Counter c1 = new SimpleCounter();
+               c1.inc(32L);
+               Counter c2 = new SimpleCounter();
+               c2.inc(64L);
+
+               taskIO.reuseRecordsInputCounter(c1);
+               taskIO.reuseRecordsOutputCounter(c2);
+               assertEquals(32L, taskIO.getNumRecordsInCounter().getCount());
+               assertEquals(64L, taskIO.getNumRecordsOutCounter().getCount());
+
+               // test IOMetrics instantiation
+               taskIO.getNumBytesInLocalCounter().inc(100L);
+               taskIO.getNumBytesInRemoteCounter().inc(150L);
+               taskIO.getNumBytesOutCounter().inc(250L);
+               
+               IOMetrics io = taskIO.createSnapshot();
+               assertEquals(32L, io.getNumRecordsIn());
+               assertEquals(64L, io.getNumRecordsOut());
+               assertEquals(100L, io.getNumBytesInLocal());
+               assertEquals(150L, io.getNumBytesInRemote());
+               assertEquals(250L, io.getNumBytesOut());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 2d38fb9..dd4c55c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -69,6 +69,7 @@ public class StreamConfig implements Serializable {
        private static final String OUT_STREAM_EDGES = "outStreamEdges";
        private static final String IN_STREAM_EDGES = "inStreamEdges";
        private static final String OPERATOR_NAME = "operatorName";
+       private static final String CHAIN_END = "chainEnd";
 
        private static final String CHECKPOINTING_ENABLED = "checkpointing";
        private static final String CHECKPOINT_MODE = "checkpointMode";
@@ -478,6 +479,14 @@ public class StreamConfig implements Serializable {
                return config.getBoolean(IS_CHAINED_VERTEX, false);
        }
 
+       public void setChainEnd() {
+               config.setBoolean(CHAIN_END, true);
+       }
+
+       public boolean isChainEnd() {
+               return config.getBoolean(CHAIN_END, false);
+       }
+
        @Override
        public String toString() {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 87fd7eb..8f9da8a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -231,6 +231,9 @@ public class StreamingJobGraphGenerator {
                                config.setChainIndex(chainIndex);
                                
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
                                
chainedConfigs.get(startNodeId).put(currentNodeId, config);
+                               if (chainableOutputs.isEmpty()) {
+                                       config.setChainEnd();
+                               }
                        }
 
                        return transitiveOutEdges;

http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 5f0dd85..a659866 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -163,6 +163,12 @@ public abstract class AbstractStreamOperator<OUT>
                
                this.metrics = 
container.getEnvironment().getMetricGroup().addOperator(config.getOperatorName());
                this.output = new CountingOutput(output, ((OperatorMetricGroup) 
this.metrics).getIOMetricGroup().getNumRecordsOutCounter());
+               if (config.isChainStart()) {
+                       ((OperatorMetricGroup) 
this.metrics).getIOMetricGroup().reuseInputMetricsForTask();
+               }
+               if (config.isChainEnd()) {
+                       ((OperatorMetricGroup) 
this.metrics).getIOMetricGroup().reuseOutputMetricsForTask();
+               }
                Configuration taskManagerConfig = 
container.getEnvironment().getTaskManagerInfo().getConfiguration();
                int historySize = 
taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, 
ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE);
                if (historySize <= 0) {

Reply via email to