Repository: flink
Updated Branches:
  refs/heads/master 104958523 -> c78b3c49e


http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
index f51cb68..86be7b0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
@@ -76,6 +76,7 @@ public class ChainedFlatMapDriver<IT, OT> extends 
ChainedDriver<IT, OT> {
        @Override
        public void collect(IT record) {
                try {
+                       this.numRecordsIn.inc();
                        this.mapper.flatMap(record, this.outputCollector);
                } catch (Exception ex) {
                        throw new 
ExceptionInChainedStubException(this.taskName, ex);

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
index 9b888f2..cef1b73 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
@@ -75,6 +75,7 @@ public class ChainedMapDriver<IT, OT> extends 
ChainedDriver<IT, OT> {
        @Override
        public void collect(IT record) {
                try {
+                       this.numRecordsIn.inc();
                        this.outputCollector.collect(this.mapper.map(record));
                } catch (Exception ex) {
                        throw new 
ExceptionInChainedStubException(this.taskName, ex);

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java
index 3912b98..e3de1c4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java
@@ -59,6 +59,7 @@ public class ChainedTerminationCriterionDriver<IT, OT> 
extends ChainedDriver<IT,
 
        @Override
        public void collect(IT record) {
+               numRecordsIn.inc();
                agg.aggregate(1);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
index 63f2b20..e6c8c2f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
@@ -167,6 +167,7 @@ public class GroupCombineChainedDriver<IN, OUT> extends 
ChainedDriver<IN, OUT> {
 
        @Override
        public void collect(IN record) {
+               numRecordsIn.inc();
                // try writing to the sorter first
                try {
                        if (this.sorter.write(record)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
index 408abc2..a003d9e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
@@ -163,6 +163,7 @@ public class SynchronousChainedCombineDriver<IN, OUT> 
extends ChainedDriver<IN,
 
        @Override
        public void collect(IN record) {
+               this.numRecordsIn.inc();
                // try writing to the sorter first
                try {
                        if (this.sorter.write(record)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingCollector.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingCollector.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingCollector.java
new file mode 100644
index 0000000..f7a1df9
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingCollector.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.operators.util.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.Collector;
+
+public class CountingCollector<OUT> implements Collector<OUT> {
+       private final Collector<OUT> collector;
+       private final Counter numRecordsOut;
+
+       public CountingCollector(Collector<OUT> collector, Counter 
numRecordsOut) {
+               this.collector = collector;
+               this.numRecordsOut = numRecordsOut;
+       }
+
+       @Override
+       public void collect(OUT record) {
+               this.numRecordsOut.inc();
+               this.collector.collect(record);
+       }
+
+       @Override
+       public void close() {
+               this.collector.close();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterable.java
new file mode 100644
index 0000000..7494108
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterable.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.operators.util.metrics;
+
+import org.apache.flink.metrics.Counter;
+
+import java.util.Iterator;
+
+public class CountingIterable<IN> implements Iterable<IN> {
+
+       private final Iterable<IN> iterable;
+       private final Counter numRecordsIn;
+
+       public CountingIterable(Iterable<IN> iterable, Counter numRecordsIn) {
+               this.iterable = iterable;
+               this.numRecordsIn = numRecordsIn;
+       }
+
+       @Override
+       public Iterator<IN> iterator() {
+               return new CountingIterator<>(this.iterable.iterator(), 
this.numRecordsIn);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterator.java
new file mode 100644
index 0000000..fe89358
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.operators.util.metrics;
+
+import org.apache.flink.metrics.Counter;
+
+import java.util.Iterator;
+
+public class CountingIterator<IN> implements Iterator<IN> {
+       private final Iterator<IN> iterator;
+       private final Counter numRecordsIn;
+
+       public CountingIterator(Iterator<IN> iterator, Counter numRecordsIn) {
+               this.iterator = iterator;
+               this.numRecordsIn = numRecordsIn;
+       }
+
+       @Override
+       public boolean hasNext() {
+               return this.iterator.hasNext();
+       }
+
+       @Override
+       public IN next() {
+               numRecordsIn.inc();
+               return this.iterator.next();
+       }
+
+       @Override
+       public void remove() {
+               this.iterator.remove();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingMutableObjectIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingMutableObjectIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingMutableObjectIterator.java
new file mode 100644
index 0000000..e4b436a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingMutableObjectIterator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.operators.util.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.io.IOException;
+
+public class CountingMutableObjectIterator<IN> implements 
MutableObjectIterator<IN> {
+       private final MutableObjectIterator<IN> iterator;
+       private final Counter numRecordsIn;
+
+       public CountingMutableObjectIterator(MutableObjectIterator<IN> 
iterator, Counter numRecordsIn) {
+               this.iterator = iterator;
+               this.numRecordsIn = numRecordsIn;
+       }
+
+       @Override
+       public IN next(IN reuse) throws IOException {
+               IN next = iterator.next(reuse);
+               if (next != null) {
+                       numRecordsIn.inc();
+               }
+               return next;
+       }
+
+       @Override
+       public IN next() throws IOException {
+               IN next = iterator.next();
+               if (next != null) {
+                       numRecordsIn.inc();
+               }
+               return next;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 017b16b..58eb90c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -319,7 +319,8 @@ public class Task implements Runnable {
 
                for (int i = 0; i < this.inputGates.length; i++) {
                        SingleInputGate gate = SingleInputGate.create(
-                                       taskNameWithSubtaskAndId, jobId, 
executionId, consumedPartitions.get(i), networkEnvironment);
+                                       taskNameWithSubtaskAndId, jobId, 
executionId, consumedPartitions.get(i), networkEnvironment, 
+                                       metricGroup.getIOMetricGroup());
 
                        this.inputGates[i] = gate;
                        inputGatesById.put(gate.getConsumedResultId(), gate);

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
index 9724a80..6853722 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.api.reader;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -190,9 +189,5 @@ public class AbstractReaderTest {
                public void setReporter(AccumulatorRegistry.Reporter reporter) {
 
                }
-
-               @Override
-               public void setMetricGroup(IOMetricGroup metrics) {
-               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
index 9717530..da15f08 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -118,7 +119,7 @@ public class InputChannelTest {
                                ResultPartitionID partitionId,
                                Tuple2<Integer, Integer> initialAndMaxBackoff) {
 
-                       super(inputGate, channelIndex, partitionId, 
initialAndMaxBackoff);
+                       super(inputGate, channelIndex, partitionId, 
initialAndMaxBackoff, new Counter());
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 8852b4f..f91a4ba 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -43,6 +43,7 @@ import 
org.apache.flink.runtime.io.network.util.TestProducerSource;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 
+import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -271,7 +272,8 @@ public class LocalInputChannelTest {
                                new ResultPartitionID(),
                                partitionManager,
                                mock(TaskEventDispatcher.class),
-                               initialAndMaxRequestBackoff);
+                               initialAndMaxRequestBackoff,
+                               new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
        }
 
        /**
@@ -346,7 +348,8 @@ public class LocalInputChannelTest {
                                        new IntermediateDataSetID(),
                                        subpartitionIndex,
                                        numberOfInputChannels,
-                                       mock(PartitionStateChecker.class));
+                                       mock(PartitionStateChecker.class),
+                                       new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                        // Set buffer pool
                        inputGate.setBufferPool(bufferPool);
@@ -360,7 +363,8 @@ public class LocalInputChannelTest {
                                                                i,
                                                                
consumedPartitionIds[i],
                                                                
partitionManager,
-                                                               
taskEventDispatcher));
+                                                               
taskEventDispatcher,
+                                                               new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup()));
                        }
 
                        this.numberOfInputChannels = numberOfInputChannels;

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index c484cc4..9eb49ef 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
+import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.junit.Test;
 import scala.Tuple2;
 
@@ -247,7 +248,8 @@ public class RemoteInputChannelTest {
                                0,
                                partitionId,
                                mock(ConnectionID.class),
-                               connectionManager);
+                               connectionManager,
+                               new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                ch.onFailedPartitionRequest();
 
@@ -266,7 +268,8 @@ public class RemoteInputChannelTest {
                                0,
                                new ResultPartitionID(),
                                mock(ConnectionID.class),
-                               connManager);
+                               connManager,
+                               new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                ch.onError(new ProducerFailedException(new 
RuntimeException("Expected test exception.")));
 
@@ -301,6 +304,7 @@ public class RemoteInputChannelTest {
                                new ResultPartitionID(),
                                mock(ConnectionID.class),
                                connectionManager,
-                               initialAndMaxRequestBackoff);
+                               initialAndMaxRequestBackoff,
+                               new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index c4bb785..05427a1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.junit.Test;
 import scala.Tuple2;
 
@@ -66,7 +67,7 @@ public class SingleInputGateTest {
        public void testBasicGetNextLogic() throws Exception {
                // Setup
                final SingleInputGate inputGate = new SingleInputGate(
-                               "Test Task Name", new JobID(), new 
ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, 
mock(PartitionStateChecker.class));
+                               "Test Task Name", new JobID(), new 
ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, 
mock(PartitionStateChecker.class), new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                final TestInputChannel[] inputChannels = new TestInputChannel[]{
                                new TestInputChannel(inputGate, 0),
@@ -113,7 +114,7 @@ public class SingleInputGateTest {
                // Setup reader with one local and one unknown input channel
                final IntermediateDataSetID resultId = new 
IntermediateDataSetID();
 
-               final SingleInputGate inputGate = new SingleInputGate("Test 
Task Name", new JobID(), new ExecutionAttemptID(), resultId, 0, 2, 
mock(PartitionStateChecker.class));
+               final SingleInputGate inputGate = new SingleInputGate("Test 
Task Name", new JobID(), new ExecutionAttemptID(), resultId, 0, 2, 
mock(PartitionStateChecker.class), new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
                final BufferPool bufferPool = mock(BufferPool.class);
                
when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2);
 
@@ -122,12 +123,12 @@ public class SingleInputGateTest {
                // Local
                ResultPartitionID localPartitionId = new ResultPartitionID(new 
IntermediateResultPartitionID(), new ExecutionAttemptID());
 
-               InputChannel local = new LocalInputChannel(inputGate, 0, 
localPartitionId, partitionManager, taskEventDispatcher);
+               InputChannel local = new LocalInputChannel(inputGate, 0, 
localPartitionId, partitionManager, taskEventDispatcher, new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                // Unknown
                ResultPartitionID unknownPartitionId = new 
ResultPartitionID(new IntermediateResultPartitionID(), new 
ExecutionAttemptID());
 
-               InputChannel unknown = new UnknownInputChannel(inputGate, 1, 
unknownPartitionId, partitionManager, taskEventDispatcher, 
mock(ConnectionManager.class), new Tuple2<Integer, Integer>(0, 0));
+               InputChannel unknown = new UnknownInputChannel(inputGate, 1, 
unknownPartitionId, partitionManager, taskEventDispatcher, 
mock(ConnectionManager.class), new Tuple2<Integer, Integer>(0, 0), new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                // Set channels
                inputGate.setInputChannel(localPartitionId.getPartitionId(), 
local);
@@ -168,7 +169,7 @@ public class SingleInputGateTest {
                                new IntermediateDataSetID(),
                                0,
                                1,
-                               mock(PartitionStateChecker.class));
+                               mock(PartitionStateChecker.class), new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
 
@@ -179,7 +180,7 @@ public class SingleInputGateTest {
                                partitionManager,
                                new TaskEventDispatcher(),
                                new LocalConnectionManager(),
-                               new Tuple2<>(0, 0));
+                               new Tuple2<Integer, Integer>(0, 0), new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                inputGate.setInputChannel(unknown.partitionId.getPartitionId(), 
unknown);
 
@@ -208,7 +209,8 @@ public class SingleInputGateTest {
                                new IntermediateDataSetID(),
                                0,
                                1,
-                               mock(PartitionStateChecker.class));
+                               mock(PartitionStateChecker.class),
+                               new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                InputChannel unknown = new UnknownInputChannel(
                                inputGate,
@@ -217,7 +219,8 @@ public class SingleInputGateTest {
                                new ResultPartitionManager(),
                                new TaskEventDispatcher(),
                                new LocalConnectionManager(),
-                               new Tuple2<>(0, 0));
+                               new Tuple2<Integer, Integer>(0, 0),
+                               new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                inputGate.setInputChannel(unknown.partitionId.getPartitionId(), 
unknown);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index 640c11a..607da94 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -59,7 +60,7 @@ public class TestSingleInputGate {
                checkArgument(numberOfInputChannels >= 1);
 
                SingleInputGate realGate = new SingleInputGate(
-                               "Test Task Name", new JobID(), new 
ExecutionAttemptID(), new IntermediateDataSetID(), 0, numberOfInputChannels, 
mock(PartitionStateChecker.class));
+                               "Test Task Name", new JobID(), new 
ExecutionAttemptID(), new IntermediateDataSetID(), 0, numberOfInputChannels, 
mock(PartitionStateChecker.class), new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                this.inputGate = spy(realGate);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index d8714d1..28f621f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 
+import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -43,8 +44,8 @@ public class UnionInputGateTest {
        public void testBasicGetNextLogic() throws Exception {
                // Setup
                final String testTaskName = "Test Task";
-               final SingleInputGate ig1 = new SingleInputGate(testTaskName, 
new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 3, 
mock(PartitionStateChecker.class));
-               final SingleInputGate ig2 = new SingleInputGate(testTaskName, 
new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 5, 
mock(PartitionStateChecker.class));
+               final SingleInputGate ig1 = new SingleInputGate(testTaskName, 
new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 3, 
mock(PartitionStateChecker.class), new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+               final SingleInputGate ig2 = new SingleInputGate(testTaskName, 
new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 5, 
mock(PartitionStateChecker.class), new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                final UnionInputGate union = new UnionInputGate(new 
SingleInputGate[]{ig1, ig2});
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
index a2edce2..6d3f768 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.groups.IOMetricGroup;
 import org.apache.flink.metrics.groups.JobMetricGroup;
 import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.metrics.groups.TaskMetricGroup;
@@ -65,4 +66,19 @@ public class UnregisteredTaskMetricsGroup extends 
TaskMetricGroup {
                        super(EMPTY_REGISTRY, new 
DummyTaskManagerMetricsGroup(), new JobID(), "testjob");
                }
        }
+       
+       public static class DummyIOMetricGroup extends IOMetricGroup {
+               public DummyIOMetricGroup() {
+                       super(EMPTY_REGISTRY, new 
UnregisteredTaskMetricsGroup());
+               }
+
+               @Override
+               protected void addMetric(String name, Metric metric) {
+               }
+
+               @Override
+               public MetricGroup addGroup(String name) {
+                       return new UnregisteredMetricsGroup();
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 326a42f..dc7bbdb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -25,10 +25,12 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.runtime.state.KvStateSnapshot;
 import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -102,10 +104,10 @@ public abstract class AbstractStreamOperator<OUT>
        public void setup(StreamTask<?, ?> containingTask, StreamConfig config, 
Output<StreamRecord<OUT>> output) {
                this.container = containingTask;
                this.config = config;
-               this.output = output;
                String operatorName = 
containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim();
                
                this.metrics = 
container.getEnvironment().getMetricGroup().addOperator(operatorName);
+               this.output = new CountingOutput(output, 
this.metrics.counter("numRecordsOut"));
                this.runtimeContext = new StreamingRuntimeContext(this, 
container.getEnvironment(), container.getAccumulatorMap());
 
                stateKeySelector1 = config.getStatePartitioner(0, 
getUserCodeClassloader());
@@ -334,4 +336,30 @@ public abstract class AbstractStreamOperator<OUT>
        public void disableInputCopy() {
                this.inputCopyDisabled = true;
        }
+
+       public class CountingOutput implements Output<StreamRecord<OUT>> {
+               private final Output<StreamRecord<OUT>> output;
+               private final Counter numRecordsOut;
+
+               public CountingOutput(Output<StreamRecord<OUT>> output, Counter 
counter) {
+                       this.output = output;
+                       this.numRecordsOut = counter;
+               }
+
+               @Override
+               public void emitWatermark(Watermark mark) {
+                       output.emitWatermark(mark);
+               }
+
+               @Override
+               public void collect(StreamRecord<OUT> record) {
+                       numRecordsOut.inc();
+                       output.collect(record);
+               }
+
+               @Override
+               public void close() {
+                       output.close();
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 84f59ed..68c623e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -51,6 +51,10 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
                this.chainingStrategy = ChainingStrategy.HEAD;
        }
 
+       public void run(final Object lockingObject) throws Exception {
+               run(lockingObject, output);
+       }
+
        
        public void run(final Object lockingObject, final 
Output<StreamRecord<OUT>> collector) throws Exception {
                final TimeCharacteristic timeCharacteristic = 
getOperatorConfig().getTimeCharacteristic();

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index ab69ab7..33a0407 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -22,6 +22,8 @@ import java.io.IOException;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.AbstractEvent;
@@ -81,6 +83,8 @@ public class StreamInputProcessor<IN> {
 
        private final DeserializationDelegate<StreamElement> 
deserializationDelegate;
 
+       private Counter numRecordsIn;
+
        @SuppressWarnings("unchecked")
        public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> 
inputSerializer,
                                                                
EventListener<CheckpointBarrier> checkpointListener,
@@ -133,6 +137,9 @@ public class StreamInputProcessor<IN> {
                if (isFinished) {
                        return false;
                }
+               if (numRecordsIn == null) {
+                       numRecordsIn = 
streamOperator.getMetricGroup().counter("numRecordsIn");
+               }
 
                while (true) {
                        if (currentRecordDeserializer != null) {
@@ -166,6 +173,7 @@ public class StreamInputProcessor<IN> {
                                                // now we can do the actual 
processing
                                                StreamRecord<IN> record = 
recordOrWatermark.asRecord();
                                                synchronized (lock) {
+                                                       numRecordsIn.inc();
                                                        
streamOperator.setKeyContextElement1(record);
                                                        
streamOperator.processElement(record);
                                                }
@@ -211,9 +219,12 @@ public class StreamInputProcessor<IN> {
         * @param metrics metric group
      */
        public void setMetricGroup(IOMetricGroup metrics) {
-               for (RecordDeserializer<?> deserializer : recordDeserializers) {
-                       deserializer.instantiateMetrics(metrics);
-               }
+               metrics.gauge("currentLowWatermark", new Gauge<Long>() {
+                       @Override
+                       public Long getValue() {
+                               return lastEmittedWatermark;
+                       }
+               });
        }
        
        public void cleanup() throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 733e7fb..1a66934 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.AbstractEvent;
@@ -287,9 +288,12 @@ public class StreamTwoInputProcessor<IN1, IN2> {
         * @param metrics metric group
         */
        public void setMetricGroup(IOMetricGroup metrics) {
-               for (RecordDeserializer<?> deserializer : recordDeserializers) {
-                       deserializer.instantiateMetrics(metrics);
-               }
+               metrics.gauge("currentLowWatermark", new Gauge<Long>() {
+                       @Override
+                       public Long getValue() {
+                               return Math.min(lastEmittedWatermark1, 
lastEmittedWatermark2);
+                       }
+               });
        }
        
        public void cleanup() throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 90abea4..761aa37 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -298,14 +299,17 @@ public class OperatorChain<OUT> {
        private static class ChainingOutput<T> implements 
Output<StreamRecord<T>> {
                
                protected final OneInputStreamOperator<T, ?> operator;
+               protected final Counter numRecordsIn;
 
                public ChainingOutput(OneInputStreamOperator<T, ?> operator) {
                        this.operator = operator;
+                       this.numRecordsIn = 
operator.getMetricGroup().counter("numRecordsIn");
                }
 
                @Override
                public void collect(StreamRecord<T> record) {
                        try {
+                               numRecordsIn.inc();
                                operator.setKeyContextElement1(record);
                                operator.processElement(record);
                        }
@@ -347,6 +351,7 @@ public class OperatorChain<OUT> {
                @Override
                public void collect(StreamRecord<T> record) {
                        try {
+                               numRecordsIn.inc();
                                StreamRecord<T> copy = 
record.copy(serializer.copy(record.getValue()));
                                operator.setKeyContextElement1(copy);
                                operator.processElement(copy);

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index af9278f..7ae99f6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -53,7 +53,7 @@ public class SourceStreamTask<OUT, SRC extends 
SourceFunction<OUT>, OP extends S
 
        @Override
        protected void run() throws Exception {
-               headOperator.run(getCheckpointLock(), getHeadOutput());
+               headOperator.run(getCheckpointLock());
        }
        
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
index af7d3f9..58e3cb8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -56,13 +57,27 @@ public class StreamIterationTail<IN> extends 
OneInputStreamTask<IN, IN> {
                
                LOG.info("Iteration tail {} acquired feedback queue {}", 
getName(), brokerID);
                
-               this.headOperator = new RecordPusher<>(dataChannel, 
iterationWaitTime);
+               this.headOperator = new RecordPusher<>();
+               this.headOperator.setup(this, getConfiguration(), new 
IterationTailOutput<>(dataChannel, iterationWaitTime));
        }
 
        private static class RecordPusher<IN> extends 
AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {
                
                private static final long serialVersionUID = 1L;
 
+               @Override
+               public void processElement(StreamRecord<IN> record) throws 
Exception {
+                       output.collect(record);
+               }
+
+               @Override
+               public void processWatermark(Watermark mark) {
+                       // ignore
+               }
+       }
+
+       private static class IterationTailOutput<IN> implements 
Output<StreamRecord<IN>> {
+
                @SuppressWarnings("NonSerializableFieldInSerializableClass")
                private final BlockingQueue<StreamRecord<IN>> dataChannel;
                
@@ -70,25 +85,32 @@ public class StreamIterationTail<IN> extends 
OneInputStreamTask<IN, IN> {
                
                private final boolean shouldWait;
 
-               RecordPusher(BlockingQueue<StreamRecord<IN>> dataChannel, long 
iterationWaitTime) {
+               IterationTailOutput(BlockingQueue<StreamRecord<IN>> 
dataChannel, long iterationWaitTime) {
                        this.dataChannel = dataChannel;
                        this.iterationWaitTime = iterationWaitTime;
                        this.shouldWait =  iterationWaitTime > 0;
                }
 
                @Override
-               public void processElement(StreamRecord<IN> record) throws 
Exception {
-                       if (shouldWait) {
-                               dataChannel.offer(record, iterationWaitTime, 
TimeUnit.MILLISECONDS);
-                       }
-                       else {
-                               dataChannel.put(record);
+               public void emitWatermark(Watermark mark) {
+               }
+
+               @Override
+               public void collect(StreamRecord<IN> record) {
+                       try {
+                               if (shouldWait) {
+                                       dataChannel.offer(record, 
iterationWaitTime, TimeUnit.MILLISECONDS);
+                               }
+                               else {
+                                       dataChannel.put(record);
+                               }
+                       } catch (InterruptedException e) {
+                               throw new RuntimeException(e);
                        }
                }
 
                @Override
-               public void processWatermark(Watermark mark) {
-                       // ignore
+               public void close() {
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index a771c85..444245c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -154,6 +155,8 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
 
        private long recoveryTimestamp;
 
+       private long lastCheckpointSize = 0;
+
        // 
------------------------------------------------------------------------
        //  Life cycle methods for specific implementations
        // 
------------------------------------------------------------------------
@@ -194,6 +197,13 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                        // allow trigger tasks to be removed if all timers for 
that timestamp are removed by user
                        timerService.setRemoveOnCancelPolicy(true);
 
+                       
getEnvironment().getMetricGroup().gauge("lastCheckpointSize", new Gauge<Long>() 
{
+                               @Override
+                               public Long getValue() {
+                                       return 
StreamTask.this.lastCheckpointSize;
+                               }
+                       });
+
                        // task specific initialization
                        init();
                        
@@ -538,6 +548,7 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                                if (allStates.isEmpty()) {
                                        
getEnvironment().acknowledgeCheckpoint(checkpointId);
                                } else if (!hasAsyncStates) {
+                                       this.lastCheckpointSize = 
allStates.getStateSize();
                                        
getEnvironment().acknowledgeCheckpoint(checkpointId, allStates);
                                } else {
                                        // start a Thread that does the 
asynchronous materialization and
@@ -572,6 +583,7 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                                                                        }
                                                                }
                                                                
StreamTaskStateList allStates = new StreamTaskStateList(states);
+                                                               
StreamTask.this.lastCheckpointSize = allStates.getStateSize();
                                                                
getEnvironment().acknowledgeCheckpoint(checkpointId, allStates);
                                                                
LOG.debug("Finished asynchronous checkpoints for checkpoint {} on task {}", 
checkpointId, getName());
                                                        }

Reply via email to