Repository: flink Updated Branches: refs/heads/master 104958523 -> c78b3c49e
http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java index f51cb68..86be7b0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java @@ -76,6 +76,7 @@ public class ChainedFlatMapDriver<IT, OT> extends ChainedDriver<IT, OT> { @Override public void collect(IT record) { try { + this.numRecordsIn.inc(); this.mapper.flatMap(record, this.outputCollector); } catch (Exception ex) { throw new ExceptionInChainedStubException(this.taskName, ex); http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java index 9b888f2..cef1b73 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java @@ -75,6 +75,7 @@ public class ChainedMapDriver<IT, OT> extends ChainedDriver<IT, OT> { @Override public void collect(IT record) { try { + this.numRecordsIn.inc(); this.outputCollector.collect(this.mapper.map(record)); } catch (Exception ex) { throw new ExceptionInChainedStubException(this.taskName, ex); http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java index 3912b98..e3de1c4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java @@ -59,6 +59,7 @@ public class ChainedTerminationCriterionDriver<IT, OT> extends ChainedDriver<IT, @Override public void collect(IT record) { + numRecordsIn.inc(); agg.aggregate(1); } http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java index 63f2b20..e6c8c2f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java @@ -167,6 +167,7 @@ public class GroupCombineChainedDriver<IN, OUT> extends ChainedDriver<IN, OUT> { @Override public void collect(IN record) { + numRecordsIn.inc(); // try writing to the sorter first try { if (this.sorter.write(record)) { http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java index 408abc2..a003d9e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java @@ -163,6 +163,7 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN, @Override public void collect(IN record) { + this.numRecordsIn.inc(); // try writing to the sorter first try { if (this.sorter.write(record)) { http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingCollector.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingCollector.java new file mode 100644 index 0000000..f7a1df9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingCollector.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.operators.util.metrics; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.util.Collector; + +public class CountingCollector<OUT> implements Collector<OUT> { + private final Collector<OUT> collector; + private final Counter numRecordsOut; + + public CountingCollector(Collector<OUT> collector, Counter numRecordsOut) { + this.collector = collector; + this.numRecordsOut = numRecordsOut; + } + + @Override + public void collect(OUT record) { + this.numRecordsOut.inc(); + this.collector.collect(record); + } + + @Override + public void close() { + this.collector.close(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterable.java new file mode 100644 index 0000000..7494108 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterable.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.operators.util.metrics; + +import org.apache.flink.metrics.Counter; + +import java.util.Iterator; + +public class CountingIterable<IN> implements Iterable<IN> { + + private final Iterable<IN> iterable; + private final Counter numRecordsIn; + + public CountingIterable(Iterable<IN> iterable, Counter numRecordsIn) { + this.iterable = iterable; + this.numRecordsIn = numRecordsIn; + } + + @Override + public Iterator<IN> iterator() { + return new CountingIterator<>(this.iterable.iterator(), this.numRecordsIn); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterator.java new file mode 100644 index 0000000..fe89358 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterator.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.operators.util.metrics; + +import org.apache.flink.metrics.Counter; + +import java.util.Iterator; + +public class CountingIterator<IN> implements Iterator<IN> { + private final Iterator<IN> iterator; + private final Counter numRecordsIn; + + public CountingIterator(Iterator<IN> iterator, Counter numRecordsIn) { + this.iterator = iterator; + this.numRecordsIn = numRecordsIn; + } + + @Override + public boolean hasNext() { + return this.iterator.hasNext(); + } + + @Override + public IN next() { + numRecordsIn.inc(); + return this.iterator.next(); + } + + @Override + public void remove() { + this.iterator.remove(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingMutableObjectIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingMutableObjectIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingMutableObjectIterator.java new file mode 100644 index 0000000..e4b436a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingMutableObjectIterator.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.operators.util.metrics; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.util.MutableObjectIterator; + +import java.io.IOException; + +public class CountingMutableObjectIterator<IN> implements MutableObjectIterator<IN> { + private final MutableObjectIterator<IN> iterator; + private final Counter numRecordsIn; + + public CountingMutableObjectIterator(MutableObjectIterator<IN> iterator, Counter numRecordsIn) { + this.iterator = iterator; + this.numRecordsIn = numRecordsIn; + } + + @Override + public IN next(IN reuse) throws IOException { + IN next = iterator.next(reuse); + if (next != null) { + numRecordsIn.inc(); + } + return next; + } + + @Override + public IN next() throws IOException { + IN next = iterator.next(); + if (next != null) { + numRecordsIn.inc(); + } + return next; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 017b16b..58eb90c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -319,7 +319,8 @@ public class Task implements Runnable { for (int i = 0; i < this.inputGates.length; i++) { SingleInputGate gate = SingleInputGate.create( - taskNameWithSubtaskAndId, jobId, executionId, consumedPartitions.get(i), networkEnvironment); + taskNameWithSubtaskAndId, jobId, executionId, consumedPartitions.get(i), networkEnvironment, + metricGroup.getIOMetricGroup()); this.inputGates[i] = gate; inputGatesById.put(gate.getConsumedResultId(), gate); http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java index 9724a80..6853722 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.api.reader; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; @@ -190,9 +189,5 @@ public class AbstractReaderTest { public void setReporter(AccumulatorRegistry.Reporter reporter) { } - - @Override - public void setMetricGroup(IOMetricGroup metrics) { - } } } http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java index 9717530..da15f08 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; +import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; @@ -118,7 +119,7 @@ public class InputChannelTest { ResultPartitionID partitionId, Tuple2<Integer, Integer> initialAndMaxBackoff) { - super(inputGate, channelIndex, partitionId, initialAndMaxBackoff); + super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, new Counter()); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index 8852b4f..f91a4ba 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -43,6 +43,7 @@ import org.apache.flink.runtime.io.network.util.TestProducerSource; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -271,7 +272,8 @@ public class LocalInputChannelTest { new ResultPartitionID(), partitionManager, mock(TaskEventDispatcher.class), - initialAndMaxRequestBackoff); + initialAndMaxRequestBackoff, + new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); } /** @@ -346,7 +348,8 @@ public class LocalInputChannelTest { new IntermediateDataSetID(), subpartitionIndex, numberOfInputChannels, - mock(PartitionStateChecker.class)); + mock(PartitionStateChecker.class), + new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); // Set buffer pool inputGate.setBufferPool(bufferPool); @@ -360,7 +363,8 @@ public class LocalInputChannelTest { i, consumedPartitionIds[i], partitionManager, - taskEventDispatcher)); + taskEventDispatcher, + new UnregisteredTaskMetricsGroup.DummyIOMetricGroup())); } this.numberOfInputChannels = numberOfInputChannels; http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java index c484cc4..9eb49ef 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.io.network.netty.PartitionRequestClient; import org.apache.flink.runtime.io.network.partition.ProducerFailedException; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.util.TestBufferFactory; +import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.junit.Test; import scala.Tuple2; @@ -247,7 +248,8 @@ public class RemoteInputChannelTest { 0, partitionId, mock(ConnectionID.class), - connectionManager); + connectionManager, + new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); ch.onFailedPartitionRequest(); @@ -266,7 +268,8 @@ public class RemoteInputChannelTest { 0, new ResultPartitionID(), mock(ConnectionID.class), - connManager); + connManager, + new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); ch.onError(new ProducerFailedException(new RuntimeException("Expected test exception."))); @@ -301,6 +304,7 @@ public class RemoteInputChannelTest { new ResultPartitionID(), mock(ConnectionID.class), connectionManager, - initialAndMaxRequestBackoff); + initialAndMaxRequestBackoff, + new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index c4bb785..05427a1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; import org.apache.flink.runtime.io.network.util.TestTaskEvent; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.junit.Test; import scala.Tuple2; @@ -66,7 +67,7 @@ public class SingleInputGateTest { public void testBasicGetNextLogic() throws Exception { // Setup final SingleInputGate inputGate = new SingleInputGate( - "Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(PartitionStateChecker.class)); + "Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); final TestInputChannel[] inputChannels = new TestInputChannel[]{ new TestInputChannel(inputGate, 0), @@ -113,7 +114,7 @@ public class SingleInputGateTest { // Setup reader with one local and one unknown input channel final IntermediateDataSetID resultId = new IntermediateDataSetID(); - final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), new ExecutionAttemptID(), resultId, 0, 2, mock(PartitionStateChecker.class)); + final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), new ExecutionAttemptID(), resultId, 0, 2, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); final BufferPool bufferPool = mock(BufferPool.class); when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2); @@ -122,12 +123,12 @@ public class SingleInputGateTest { // Local ResultPartitionID localPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID()); - InputChannel local = new LocalInputChannel(inputGate, 0, localPartitionId, partitionManager, taskEventDispatcher); + InputChannel local = new LocalInputChannel(inputGate, 0, localPartitionId, partitionManager, taskEventDispatcher, new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); // Unknown ResultPartitionID unknownPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID()); - InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class), new Tuple2<Integer, Integer>(0, 0)); + InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class), new Tuple2<Integer, Integer>(0, 0), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); // Set channels inputGate.setInputChannel(localPartitionId.getPartitionId(), local); @@ -168,7 +169,7 @@ public class SingleInputGateTest { new IntermediateDataSetID(), 0, 1, - mock(PartitionStateChecker.class)); + mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); @@ -179,7 +180,7 @@ public class SingleInputGateTest { partitionManager, new TaskEventDispatcher(), new LocalConnectionManager(), - new Tuple2<>(0, 0)); + new Tuple2<Integer, Integer>(0, 0), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown); @@ -208,7 +209,8 @@ public class SingleInputGateTest { new IntermediateDataSetID(), 0, 1, - mock(PartitionStateChecker.class)); + mock(PartitionStateChecker.class), + new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); InputChannel unknown = new UnknownInputChannel( inputGate, @@ -217,7 +219,8 @@ public class SingleInputGateTest { new ResultPartitionManager(), new TaskEventDispatcher(), new LocalConnectionManager(), - new Tuple2<>(0, 0)); + new Tuple2<Integer, Integer>(0, 0), + new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown); http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java index 640c11a..607da94 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.apache.flink.runtime.util.event.EventListener; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -59,7 +60,7 @@ public class TestSingleInputGate { checkArgument(numberOfInputChannels >= 1); SingleInputGate realGate = new SingleInputGate( - "Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, numberOfInputChannels, mock(PartitionStateChecker.class)); + "Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, numberOfInputChannels, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); this.inputGate = spy(realGate); http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java index d8714d1..28f621f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -43,8 +44,8 @@ public class UnionInputGateTest { public void testBasicGetNextLogic() throws Exception { // Setup final String testTaskName = "Test Task"; - final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 3, mock(PartitionStateChecker.class)); - final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 5, mock(PartitionStateChecker.class)); + final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 3, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 5, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); final UnionInputGate union = new UnionInputGate(new SingleInputGate[]{ig1, ig2}); http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java index a2edce2..6d3f768 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.MetricRegistry; +import org.apache.flink.metrics.groups.IOMetricGroup; import org.apache.flink.metrics.groups.JobMetricGroup; import org.apache.flink.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.metrics.groups.TaskMetricGroup; @@ -65,4 +66,19 @@ public class UnregisteredTaskMetricsGroup extends TaskMetricGroup { super(EMPTY_REGISTRY, new DummyTaskManagerMetricsGroup(), new JobID(), "testjob"); } } + + public static class DummyIOMetricGroup extends IOMetricGroup { + public DummyIOMetricGroup() { + super(EMPTY_REGISTRY, new UnregisteredTaskMetricsGroup()); + } + + @Override + protected void addMetric(String name, Metric metric) { + } + + @Override + public MetricGroup addGroup(String name) { + return new UnregisteredMetricsGroup(); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 326a42f..dc7bbdb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -25,10 +25,12 @@ import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.VoidSerializer; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.runtime.state.KvStateSnapshot; import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; @@ -102,10 +104,10 @@ public abstract class AbstractStreamOperator<OUT> public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) { this.container = containingTask; this.config = config; - this.output = output; String operatorName = containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim(); this.metrics = container.getEnvironment().getMetricGroup().addOperator(operatorName); + this.output = new CountingOutput(output, this.metrics.counter("numRecordsOut")); this.runtimeContext = new StreamingRuntimeContext(this, container.getEnvironment(), container.getAccumulatorMap()); stateKeySelector1 = config.getStatePartitioner(0, getUserCodeClassloader()); @@ -334,4 +336,30 @@ public abstract class AbstractStreamOperator<OUT> public void disableInputCopy() { this.inputCopyDisabled = true; } + + public class CountingOutput implements Output<StreamRecord<OUT>> { + private final Output<StreamRecord<OUT>> output; + private final Counter numRecordsOut; + + public CountingOutput(Output<StreamRecord<OUT>> output, Counter counter) { + this.output = output; + this.numRecordsOut = counter; + } + + @Override + public void emitWatermark(Watermark mark) { + output.emitWatermark(mark); + } + + @Override + public void collect(StreamRecord<OUT> record) { + numRecordsOut.inc(); + output.collect(record); + } + + @Override + public void close() { + output.close(); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java index 84f59ed..68c623e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java @@ -51,6 +51,10 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> this.chainingStrategy = ChainingStrategy.HEAD; } + public void run(final Object lockingObject) throws Exception { + run(lockingObject, output); + } + public void run(final Object lockingObject, final Output<StreamRecord<OUT>> collector) throws Exception { final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic(); http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index ab69ab7..33a0407 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -22,6 +22,8 @@ import java.io.IOException; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.event.AbstractEvent; @@ -81,6 +83,8 @@ public class StreamInputProcessor<IN> { private final DeserializationDelegate<StreamElement> deserializationDelegate; + private Counter numRecordsIn; + @SuppressWarnings("unchecked") public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer, EventListener<CheckpointBarrier> checkpointListener, @@ -133,6 +137,9 @@ public class StreamInputProcessor<IN> { if (isFinished) { return false; } + if (numRecordsIn == null) { + numRecordsIn = streamOperator.getMetricGroup().counter("numRecordsIn"); + } while (true) { if (currentRecordDeserializer != null) { @@ -166,6 +173,7 @@ public class StreamInputProcessor<IN> { // now we can do the actual processing StreamRecord<IN> record = recordOrWatermark.asRecord(); synchronized (lock) { + numRecordsIn.inc(); streamOperator.setKeyContextElement1(record); streamOperator.processElement(record); } @@ -211,9 +219,12 @@ public class StreamInputProcessor<IN> { * @param metrics metric group */ public void setMetricGroup(IOMetricGroup metrics) { - for (RecordDeserializer<?> deserializer : recordDeserializers) { - deserializer.instantiateMetrics(metrics); - } + metrics.gauge("currentLowWatermark", new Gauge<Long>() { + @Override + public Long getValue() { + return lastEmittedWatermark; + } + }); } public void cleanup() throws IOException { http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index 733e7fb..1a66934 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.event.AbstractEvent; @@ -287,9 +288,12 @@ public class StreamTwoInputProcessor<IN1, IN2> { * @param metrics metric group */ public void setMetricGroup(IOMetricGroup metrics) { - for (RecordDeserializer<?> deserializer : recordDeserializers) { - deserializer.instantiateMetrics(metrics); - } + metrics.gauge("currentLowWatermark", new Gauge<Long>() { + @Override + public Long getValue() { + return Math.min(lastEmittedWatermark1, lastEmittedWatermark2); + } + }); } public void cleanup() throws IOException { http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index 90abea4..761aa37 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -26,6 +26,7 @@ import java.util.Map; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; @@ -298,14 +299,17 @@ public class OperatorChain<OUT> { private static class ChainingOutput<T> implements Output<StreamRecord<T>> { protected final OneInputStreamOperator<T, ?> operator; + protected final Counter numRecordsIn; public ChainingOutput(OneInputStreamOperator<T, ?> operator) { this.operator = operator; + this.numRecordsIn = operator.getMetricGroup().counter("numRecordsIn"); } @Override public void collect(StreamRecord<T> record) { try { + numRecordsIn.inc(); operator.setKeyContextElement1(record); operator.processElement(record); } @@ -347,6 +351,7 @@ public class OperatorChain<OUT> { @Override public void collect(StreamRecord<T> record) { try { + numRecordsIn.inc(); StreamRecord<T> copy = record.copy(serializer.copy(record.getValue())); operator.setKeyContextElement1(copy); operator.processElement(copy); http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java index af9278f..7ae99f6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java @@ -53,7 +53,7 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S @Override protected void run() throws Exception { - headOperator.run(getCheckpointLock(), getHeadOutput()); + headOperator.run(getCheckpointLock()); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java index af7d3f9..58e3cb8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.io.BlockingQueueBroker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -56,13 +57,27 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> { LOG.info("Iteration tail {} acquired feedback queue {}", getName(), brokerID); - this.headOperator = new RecordPusher<>(dataChannel, iterationWaitTime); + this.headOperator = new RecordPusher<>(); + this.headOperator.setup(this, getConfiguration(), new IterationTailOutput<>(dataChannel, iterationWaitTime)); } private static class RecordPusher<IN> extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> { private static final long serialVersionUID = 1L; + @Override + public void processElement(StreamRecord<IN> record) throws Exception { + output.collect(record); + } + + @Override + public void processWatermark(Watermark mark) { + // ignore + } + } + + private static class IterationTailOutput<IN> implements Output<StreamRecord<IN>> { + @SuppressWarnings("NonSerializableFieldInSerializableClass") private final BlockingQueue<StreamRecord<IN>> dataChannel; @@ -70,25 +85,32 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> { private final boolean shouldWait; - RecordPusher(BlockingQueue<StreamRecord<IN>> dataChannel, long iterationWaitTime) { + IterationTailOutput(BlockingQueue<StreamRecord<IN>> dataChannel, long iterationWaitTime) { this.dataChannel = dataChannel; this.iterationWaitTime = iterationWaitTime; this.shouldWait = iterationWaitTime > 0; } @Override - public void processElement(StreamRecord<IN> record) throws Exception { - if (shouldWait) { - dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS); - } - else { - dataChannel.put(record); + public void emitWatermark(Watermark mark) { + } + + @Override + public void collect(StreamRecord<IN> record) { + try { + if (shouldWait) { + dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS); + } + else { + dataChannel.put(record); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); } } @Override - public void processWatermark(Watermark mark) { - // ignore + public void close() { } } } http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index a771c85..444245c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.metrics.Gauge; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; @@ -154,6 +155,8 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> private long recoveryTimestamp; + private long lastCheckpointSize = 0; + // ------------------------------------------------------------------------ // Life cycle methods for specific implementations // ------------------------------------------------------------------------ @@ -194,6 +197,13 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> // allow trigger tasks to be removed if all timers for that timestamp are removed by user timerService.setRemoveOnCancelPolicy(true); + getEnvironment().getMetricGroup().gauge("lastCheckpointSize", new Gauge<Long>() { + @Override + public Long getValue() { + return StreamTask.this.lastCheckpointSize; + } + }); + // task specific initialization init(); @@ -538,6 +548,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> if (allStates.isEmpty()) { getEnvironment().acknowledgeCheckpoint(checkpointId); } else if (!hasAsyncStates) { + this.lastCheckpointSize = allStates.getStateSize(); getEnvironment().acknowledgeCheckpoint(checkpointId, allStates); } else { // start a Thread that does the asynchronous materialization and @@ -572,6 +583,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> } } StreamTaskStateList allStates = new StreamTaskStateList(states); + StreamTask.this.lastCheckpointSize = allStates.getStateSize(); getEnvironment().acknowledgeCheckpoint(checkpointId, allStates); LOG.debug("Finished asynchronous checkpoints for checkpoint {} on task {}", checkpointId, getName()); }