This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch FLINK-19743 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 560f61e33a8307b83a989c5e6e656c4f45edb092 Author: Jiangjie (Becket) Qin <[email protected]> AuthorDate: Thu Oct 22 08:21:21 2020 +0800 [FLINK-19743][connectors/common] Add metric definitions of Source. (FLIP-33) - Expose the Source metrics via SourceMetricGroup and SourceReaderContext. - Let SourceOperator report the WatermarkLag. --- .../file/src/FileSourceHeavyThroughputTest.java | 8 +-- .../api/connector/source/SourceReaderContext.java | 4 +- .../source/metrics/SourceMetricGroup.java | 54 +++++++++++++++ .../source/lib/NumberSequenceSourceTest.java | 26 +++++++- .../source/mocks/MockSourceMetricGroup.java | 50 ++++++++++++++ .../connector/source/mocks/MockSourceReader.java | 12 +++- .../metrics/groups/SourceMetricGroupImpl.java | 76 ++++++++++++++++++++++ flink-streaming-java/pom.xml | 7 ++ .../streaming/api/operators/SourceOperator.java | 58 +++++++++++++++-- .../source/NoOpTimestampsAndWatermarks.java | 6 ++ .../source/ProgressiveTimestampsAndWatermarks.java | 14 +++- .../operators/source/TimestampsAndWatermarks.java | 7 ++ .../operators/source/WatermarkToDataOutput.java | 4 ++ .../api/operators/SourceOperatorTest.java | 27 ++++++++ .../api/operators/source/CollectingDataOutput.java | 4 +- .../operators/source/TestingSourceOperator.java | 6 +- ...tStreamTaskChainedSourcesCheckpointingTest.java | 8 +-- .../runtime/tasks/MultipleInputStreamTaskTest.java | 15 ++--- .../tasks/SourceOperatorStreamTaskTest.java | 3 +- .../source/reader/TestingReaderContext.java | 7 +- .../source/reader/TestingSourceMetricGroup.java | 48 ++++++++++++++ 21 files changed, 406 insertions(+), 38 deletions(-) diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java index 9a43643..43a1efe 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java @@ -26,15 +26,15 @@ import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SourceOutput; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.metrics.SourceMetricGroup; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.file.src.reader.SimpleStreamFormat; import org.apache.flink.connector.file.src.reader.StreamFormat; import org.apache.flink.connector.file.src.testutils.TestingFileSystem; +import org.apache.flink.connector.testutils.source.reader.TestingSourceMetricGroup; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.Path; import org.apache.flink.core.io.InputStatus; -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.junit.After; import org.junit.Test; @@ -186,8 +186,8 @@ public class FileSourceHeavyThroughputTest { private static final class NoOpReaderContext implements SourceReaderContext { @Override - public MetricGroup metricGroup() { - return new UnregisteredMetricsGroup(); + public SourceMetricGroup metricGroup() { + return new TestingSourceMetricGroup(); } @Override diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java index f1ea255..2c5c696 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java @@ -19,8 +19,8 @@ package org.apache.flink.api.connector.source; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.source.metrics.SourceMetricGroup; import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.MetricGroup; /** * The class that expose some context from runtime to the {@link SourceReader}. @@ -31,7 +31,7 @@ public interface SourceReaderContext { /** * @return The metric group this source belongs to. */ - MetricGroup metricGroup(); + SourceMetricGroup metricGroup(); /** * Gets the configuration with which Flink was started. diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/metrics/SourceMetricGroup.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/metrics/SourceMetricGroup.java new file mode 100644 index 0000000..d6fc667 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/metrics/SourceMetricGroup.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.metrics; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MetricGroup; + +/** + * An interface that defines the metric names of the Source (FLIP-27). The interface + * also exposes the methods for the predefined metrics that need to be updated by + * the Source implementations. + */ +public interface SourceMetricGroup extends MetricGroup { + // Source metrics convention specified in FLIP-33. + String CURRENT_FETCH_EVENT_TIME_LAG = "currentFetchEventTimeLag"; + String CURRENT_EMIT_EVENT_TIME_LAG = "currentEmitEventTimeLag"; + String WATERMARK_LAG = "watermarkLag"; + String SOURCE_IDLE_TIME = "sourceIdleTime"; + String PENDING_BYTES = "pendingBytes"; + String PENDING_RECORDS = "pendingRecords"; + String IO_NUM_RECORDS_IN_ERRORS = "numRecordsInErrors"; + + /** + * @return the counter to record number of records received by the SourceReader. + */ + Counter getNumRecordsInCounter(); + + /** + * @return the counter to record number of bytes received by the SourceReader. + */ + Counter getNumBytesInCounter(); + + /** + * @return the counter to record the errors encountered when reading the records. + */ + Counter getNumRecordsInErrorsCounter(); + +} diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java index 518ce86..cc961eb 100644 --- a/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java @@ -24,9 +24,10 @@ import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SourceOutput; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.metrics.SourceMetricGroup; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputStatus; -import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.junit.Test; @@ -106,8 +107,8 @@ public class NumberSequenceSourceTest { private static final class DummyReaderContext implements SourceReaderContext { @Override - public MetricGroup metricGroup() { - return new UnregisteredMetricsGroup(); + public SourceMetricGroup metricGroup() { + return new DummySourceMetricsGroup(); } @Override @@ -168,4 +169,23 @@ public class NumberSequenceSourceTest { return emittedRecords; } } + + private static final class DummySourceMetricsGroup + extends UnregisteredMetricsGroup implements SourceMetricGroup { + + @Override + public Counter getNumRecordsInCounter() { + throw new UnsupportedOperationException(); + } + + @Override + public Counter getNumBytesInCounter() { + throw new UnsupportedOperationException(); + } + + @Override + public Counter getNumRecordsInErrorsCounter() { + throw new UnsupportedOperationException(); + } + } } diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceMetricGroup.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceMetricGroup.java new file mode 100644 index 0000000..0b3cae4 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceMetricGroup.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.mocks; + +import org.apache.flink.api.connector.source.metrics.SourceMetricGroup; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; + +/** + * A testing implementation of {@link org.apache.flink.api.connector.source.metrics.SourceMetricGroup}. + * This class is not supposed to be used by classes out of the flink-core module. Please use + * TestingSourceMetricGroup in flink-connector-test-utils instead. + */ +public class MockSourceMetricGroup extends UnregisteredMetricsGroup implements SourceMetricGroup { + private final Counter numRecordsInCounter = new SimpleCounter(); + private final Counter numBytesInCounter = new SimpleCounter(); + private final Counter numRecordsInErrorsCounter = new SimpleCounter(); + + @Override + public Counter getNumRecordsInCounter() { + return numRecordsInCounter; + } + + @Override + public Counter getNumBytesInCounter() { + return numBytesInCounter; + } + + @Override + public Counter getNumRecordsInErrorsCounter() { + return numRecordsInErrorsCounter; + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java index 8eb4596..38805ad 100644 --- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java +++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java @@ -18,6 +18,7 @@ package org.apache.flink.api.connector.source.mocks; +import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.connector.source.ReaderOutput; import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SourceReader; @@ -43,6 +44,7 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> private boolean started; private boolean closed; private boolean waitingForMoreSplits; + private boolean emitWatermark; @GuardedBy("this") private CompletableFuture<Void> availableFuture; @@ -76,7 +78,11 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> } // Read from the split with available record. if (currentSplitIndex < assignedSplits.size()) { - sourceOutput.collect(assignedSplits.get(currentSplitIndex).getNext(false)[0]); + int value = assignedSplits.get(currentSplitIndex).getNext(false)[0]; + sourceOutput.collect(value, value); + if (emitWatermark) { + sourceOutput.emitWatermark(new Watermark(value)); + } return InputStatus.MORE_AVAILABLE; } else if (finished) { // In case no split has available record, return depending on whether all the splits has finished. @@ -153,6 +159,10 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> } } + public void enableWatermarkEmission() { + emitWatermark = true; + } + public boolean isStarted() { return started; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/SourceMetricGroupImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/SourceMetricGroupImpl.java new file mode 100644 index 0000000..d2b67bf --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/SourceMetricGroupImpl.java @@ -0,0 +1,76 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package org.apache.flink.runtime.metrics.groups; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.connector.source.metrics.SourceMetricGroup; +import org.apache.flink.metrics.Counter; +import org.apache.flink.util.clock.SystemClock; + +import java.util.function.Supplier; + +/** + * A metric group that adds source specific metrics to the SourceOperator. + * It adds Source specific metrics to the OperatorMetricsGroup. + */ +public class SourceMetricGroupImpl extends ProxyMetricGroup<OperatorMetricGroup> implements SourceMetricGroup { + private final Counter numRecordsInCounter; + private final Counter numRecordsInErrorsCounter; + private final Counter numBytesIn; + private final Supplier<Long> currentWatermarkSupplier; + + public SourceMetricGroupImpl( + OperatorMetricGroup operatorMetricGroup, + Counter numBytesInCounter, + Supplier<Long> currentWatermarkSupplier) { + super(operatorMetricGroup); + this.numRecordsInCounter = operatorMetricGroup.getIOMetricGroup().getNumRecordsInCounter(); + this.numRecordsInErrorsCounter = operatorMetricGroup.counter(SourceMetricGroup.IO_NUM_RECORDS_IN_ERRORS); + this.numBytesIn = numBytesInCounter; + this.currentWatermarkSupplier = currentWatermarkSupplier; + operatorMetricGroup.gauge( + SourceMetricGroup.WATERMARK_LAG, + () -> { + long currentWatermark = currentWatermarkSupplier.get(); + if (currentWatermark < 0) { + return -1L; + } else { + return SystemClock.getInstance().absoluteTimeMillis() - currentWatermark; + } + }); + } + + @Override + public Counter getNumRecordsInCounter() { + return numRecordsInCounter; + } + + public Counter getNumBytesInCounter() { + return numBytesIn; + } + + public Counter getNumRecordsInErrorsCounter() { + return numRecordsInErrorsCounter; + } + + @VisibleForTesting + public Supplier<Long> getCurrentWatermarkSupplier() { + return currentWatermarkSupplier; + } +} diff --git a/flink-streaming-java/pom.xml b/flink-streaming-java/pom.xml index f0bd66b..94c1c16 100644 --- a/flink-streaming-java/pom.xml +++ b/flink-streaming-java/pom.xml @@ -90,6 +90,13 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-runtime_${scala.binary.version}</artifactId> <version>${project.version}</version> <scope>test</scope> diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index 46839b3..5234805 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -28,10 +28,12 @@ import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.metrics.SourceMetricGroup; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputStatus; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.metrics.groups.SourceMetricGroupImpl; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; @@ -42,10 +44,13 @@ import org.apache.flink.runtime.source.event.RequestSplitEvent; import org.apache.flink.runtime.source.event.SourceEventWrapper; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks; import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.function.FunctionWithException; @@ -119,6 +124,9 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> * but we currently need to instantiate this lazily, because the metric groups exist only later. */ private TimestampsAndWatermarks<OUT> eventTimeLogic; + /** The metric group is initialized lazily at runtime. */ + private SourceMetricGroup sourceMetricGroup; + public SourceOperator( FunctionWithException<SourceReaderContext, SourceReader<OUT, SplitT>, Exception> readerFactory, OperatorEventGateway operatorEventGateway, @@ -128,6 +136,22 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> Configuration configuration, String localHostname, boolean emitProgressiveWatermarks) { + // The SourceMetricGorup is set to null here because it is set lazily in the setup(). + this(readerFactory, operatorEventGateway, splitSerializer, watermarkStrategy, + timeService, configuration, localHostname, emitProgressiveWatermarks, null); + } + + @VisibleForTesting + protected SourceOperator( + FunctionWithException<SourceReaderContext, SourceReader<OUT, SplitT>, Exception> readerFactory, + OperatorEventGateway operatorEventGateway, + SimpleVersionedSerializer<SplitT> splitSerializer, + WatermarkStrategy<OUT> watermarkStrategy, + ProcessingTimeService timeService, + Configuration configuration, + String localHostname, + boolean emitProgressiveWatermarks, + SourceMetricGroup sourceMetricGroup) { this.readerFactory = checkNotNull(readerFactory); this.operatorEventGateway = checkNotNull(operatorEventGateway); @@ -137,16 +161,35 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> this.configuration = checkNotNull(configuration); this.localHostname = checkNotNull(localHostname); this.emitProgressiveWatermarks = emitProgressiveWatermarks; + this.sourceMetricGroup = sourceMetricGroup; + } + + @Override + public void setup( + StreamTask<?, ?> containingTask, + StreamConfig config, + Output<StreamRecord<OUT>> output) { + super.setup(containingTask, config, output); + // Reuse the numBytesInCounter from TaskMetricGroup. + sourceMetricGroup = new SourceMetricGroupImpl( + super.metrics, + super.metrics.parent().getIOMetricGroup().getNumBytesInCounter(), + () -> { + if (eventTimeLogic != null) { + return eventTimeLogic.getWatermark(); + } else { + return -1L; + } + }); } @Override public void open() throws Exception { - final MetricGroup metricGroup = getMetricGroup(); final SourceReaderContext context = new SourceReaderContext() { @Override - public MetricGroup metricGroup() { - return metricGroup; + public SourceMetricGroup metricGroup() { + return sourceMetricGroup; } @Override @@ -180,13 +223,13 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> if (emitProgressiveWatermarks) { eventTimeLogic = TimestampsAndWatermarks.createProgressiveEventTimeLogic( watermarkStrategy, - metricGroup, + sourceMetricGroup, getProcessingTimeService(), getExecutionConfig().getAutoWatermarkInterval()); } else { eventTimeLogic = TimestampsAndWatermarks.createNoOpEventTimeLogic( watermarkStrategy, - metricGroup); + sourceMetricGroup); } sourceReader = readerFactory.apply(context); @@ -218,6 +261,11 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> } @Override + public MetricGroup getMetricGroup() { + return sourceMetricGroup; + } + + @Override public InputStatus emitNext(DataOutput<OUT> output) throws Exception { // guarding an assumptions we currently make due to the fact that certain classes // assume a constant output diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/NoOpTimestampsAndWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/NoOpTimestampsAndWatermarks.java index f28ef55..d263bd6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/NoOpTimestampsAndWatermarks.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/NoOpTimestampsAndWatermarks.java @@ -64,6 +64,12 @@ public class NoOpTimestampsAndWatermarks<T> implements TimestampsAndWatermarks<T // no periodic watermarks } + @Override + public long getWatermark() { + // No watermark. + return -1L; + } + // ------------------------------------------------------------------------ /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java index 1d67af2..5a11a01 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java @@ -65,6 +65,9 @@ public class ProgressiveTimestampsAndWatermarks<T> implements TimestampsAndWater private StreamingReaderOutput<T> currentMainOutput; @Nullable + private WatermarkToDataOutput watermarkOutput; + + @Nullable private ScheduledFuture<?> periodicEmitHandle; public ProgressiveTimestampsAndWatermarks( @@ -98,7 +101,7 @@ public class ProgressiveTimestampsAndWatermarks<T> implements TimestampsAndWater // support re-assigning the underlying output checkState(currentMainOutput == null && currentPerSplitOutputs == null, "already created a main output"); - final WatermarkOutput watermarkOutput = new WatermarkToDataOutput(output); + watermarkOutput = new WatermarkToDataOutput(output); final WatermarkGenerator<T> watermarkGenerator = watermarksFactory.createWatermarkGenerator(watermarksContext); currentPerSplitOutputs = new SplitLocalOutputs<>( @@ -141,6 +144,15 @@ public class ProgressiveTimestampsAndWatermarks<T> implements TimestampsAndWater } } + @Override + public long getWatermark() { + if (watermarkOutput != null) { + return watermarkOutput.getMaxWatermarkSoFar(); + } else { + return -1L; + } + } + void triggerPeriodicEmit(@SuppressWarnings("unused") long wallClockTimestamp) { if (currentPerSplitOutputs != null) { currentPerSplitOutputs.emitPeriodicWatermark(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java index 51068d2..d8320e8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java @@ -63,6 +63,13 @@ public interface TimestampsAndWatermarks<T> { */ void stopPeriodicWatermarkEmits(); + /** + * Get the current watermark. + * + * @return the current watermark. + */ + long getWatermark(); + // ------------------------------------------------------------------------ // factories // ------------------------------------------------------------------------ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java index 3e3b255..8c26d7b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java @@ -83,4 +83,8 @@ public final class WatermarkToDataOutput implements WatermarkOutput { throw new ExceptionInChainedOperatorException(e); } } + + public long getMaxWatermarkSoFar() { + return maxWatermarkSoFar; + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java index 78348d8..541d28c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java @@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.io.SimpleVersionedSerialization; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.metrics.groups.SourceMetricGroupImpl; import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.operators.testutils.MockEnvironment; @@ -42,6 +43,7 @@ import org.apache.flink.runtime.state.StateInitializationContextImpl; import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.operators.source.CollectingDataOutput; import org.apache.flink.streaming.api.operators.source.TestingSourceOperator; import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask; import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment; @@ -179,6 +181,31 @@ public class SourceOperatorTest { assertEquals(100L, (long) mockSourceReader.getAbortedCheckpoints().get(0)); } + @Test + public void testMetrics() throws Exception { + StateInitializationContext stateContext = getStateContext(); + operator.initializeState(stateContext); + operator.open(); + mockSourceReader.enableWatermarkEmission(); + try { + MockSourceSplit newSplit = new MockSourceSplit(2); + newSplit.addRecord(100); + operator.handleOperatorEvent(new AddSplitEvent<>( + Collections.singletonList(newSplit), new MockSourceSplitSerializer())); + CollectingDataOutput<Integer> output = new CollectingDataOutput<>(); + SourceMetricGroupImpl metrics = + (SourceMetricGroupImpl) operator.getMetricGroup(); + assertEquals(-1L, (long) metrics.getCurrentWatermarkSupplier().get()); + operator.emitNext(output); + assertEquals("There should be a record and a watermark emission.", + 2, output.events.size()); + assertEquals(100L, (long) metrics.getCurrentWatermarkSupplier().get()); + } + finally { + operator.close(); + } + } + // ---------------- helper methods ------------------------- private StateInitializationContext getStateContext() throws Exception { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/CollectingDataOutput.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/CollectingDataOutput.java index a536553..a1a3b20 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/CollectingDataOutput.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/CollectingDataOutput.java @@ -30,9 +30,9 @@ import java.util.List; /** * A test utility implementation of {@link PushingAsyncDataInput.DataOutput} that collects all events. */ -final class CollectingDataOutput<E> implements PushingAsyncDataInput.DataOutput<E> { +public final class CollectingDataOutput<E> implements PushingAsyncDataInput.DataOutput<E> { - final List<Object> events = new ArrayList<>(); + public final List<Object> events = new ArrayList<>(); @Override public void emitWatermark(Watermark watermark) throws Exception { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java index 46b1382..61d19b9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java @@ -24,6 +24,7 @@ import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer; import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.testutils.source.reader.TestingSourceMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway; import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; @@ -36,7 +37,7 @@ import org.apache.flink.streaming.util.MockStreamingRuntimeContext; /** * A SourceOperator extension to simplify test setup. */ -public class TestingSourceOperator<T> extends SourceOperator<T, MockSourceSplit> { +public class TestingSourceOperator<T> extends SourceOperator<T, MockSourceSplit> { private static final long serialVersionUID = 1L; @@ -80,7 +81,8 @@ public class TestingSourceOperator<T> extends SourceOperator<T, MockSourceSplit timeService, new Configuration(), "localhost", - emitProgressiveWatermarks); + emitProgressiveWatermarks, + new TestingSourceMetricGroup()); this.subtaskIndex = subtaskIndex; this.parallelism = parallelism; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java index 5f912ab..9e104d8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java @@ -137,9 +137,9 @@ public class MultipleInputStreamTaskChainedSourcesCheckpointingTest { Future<Boolean> checkpointFuture = testHarness.getStreamTask().triggerCheckpointAsync(metaData, barrier.getCheckpointOptions(), false); processSingleStepUntil(testHarness, checkpointFuture::isDone); - expectedOutput.add(new StreamRecord<>("42", TimestampAssigner.NO_TIMESTAMP)); - expectedOutput.add(new StreamRecord<>("42", TimestampAssigner.NO_TIMESTAMP)); - expectedOutput.add(new StreamRecord<>("42", TimestampAssigner.NO_TIMESTAMP)); + expectedOutput.add(new StreamRecord<>("42", 42L)); + expectedOutput.add(new StreamRecord<>("42", 42L)); + expectedOutput.add(new StreamRecord<>("42", 42L)); expectedOutput.add(new StreamRecord<>("44", TimestampAssigner.NO_TIMESTAMP)); expectedOutput.add(new StreamRecord<>("44", TimestampAssigner.NO_TIMESTAMP)); expectedOutput.add(new StreamRecord<>("47.0", TimestampAssigner.NO_TIMESTAMP)); @@ -202,7 +202,7 @@ public class MultipleInputStreamTaskChainedSourcesCheckpointingTest { addSourceRecords(testHarness, 0, 42, 43, 44); processSingleStepUntil(testHarness, () -> !testHarness.getOutput().isEmpty()); - expectedOutput.add(new StreamRecord<>("42", TimestampAssigner.NO_TIMESTAMP)); + expectedOutput.add(new StreamRecord<>("42", 42L)); CheckpointBarrier barrier = createBarrier(testHarness); Future<Boolean> checkpointFuture = testHarness.getStreamTask().triggerCheckpointAsync(metaData, barrier.getCheckpointOptions(), false); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java index f3b963a..9259750 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.eventtime.TimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkGenerator; import org.apache.flink.api.common.eventtime.WatermarkOutput; import org.apache.flink.api.common.eventtime.WatermarkStrategy; @@ -113,8 +112,8 @@ public class MultipleInputStreamTaskTest { ArrayDeque<Object> expectedOutput = new ArrayDeque<>(); addSourceRecords(testHarness, 1, 42, 43); - expectedOutput.add(new StreamRecord<>("42", TimestampAssigner.NO_TIMESTAMP)); - expectedOutput.add(new StreamRecord<>("43", TimestampAssigner.NO_TIMESTAMP)); + expectedOutput.add(new StreamRecord<>("42", 42)); + expectedOutput.add(new StreamRecord<>("43", 43)); testHarness.processElement(new StreamRecord<>("Hello", initialTime + 1), 0); expectedOutput.add(new StreamRecord<>("Hello", initialTime + 1)); testHarness.processElement(new StreamRecord<>(42.44d, initialTime + 3), 1); @@ -419,7 +418,7 @@ public class MultipleInputStreamTaskTest { testHarness.processElement(new Watermark(initialTime), 0, 1); addSourceRecords(testHarness, 1, initialTime); - expectedOutput.add(new StreamRecord<>("" + (initialTime), TimestampAssigner.NO_TIMESTAMP)); + expectedOutput.add(new StreamRecord<>("" + (initialTime), initialTime)); testHarness.processElement(new Watermark(initialTime), 1, 0); @@ -443,7 +442,7 @@ public class MultipleInputStreamTaskTest { testHarness.processElement(new Watermark(initialTime + 3), 0, 1); addSourceRecords(testHarness, 1, initialTime + 3); - expectedOutput.add(new StreamRecord<>("" + (initialTime + 3), TimestampAssigner.NO_TIMESTAMP)); + expectedOutput.add(new StreamRecord<>("" + (initialTime + 3), initialTime + 3)); testHarness.processElement(new Watermark(initialTime + 3), 1, 0); testHarness.processElement(new Watermark(initialTime + 2), 1, 1); @@ -463,7 +462,7 @@ public class MultipleInputStreamTaskTest { testHarness.processElement(new Watermark(initialTime + 4), 0, 1); addSourceRecords(testHarness, 1, initialTime + 4); - expectedOutput.add(new StreamRecord<>("" + (initialTime + 4), TimestampAssigner.NO_TIMESTAMP)); + expectedOutput.add(new StreamRecord<>("" + (initialTime + 4), initialTime + 4)); testHarness.processElement(new Watermark(initialTime + 4), 1, 0); expectedOutput.add(new Watermark(initialTime + 4)); @@ -518,7 +517,7 @@ public class MultipleInputStreamTaskTest { // we wake up the source and emit watermark addSourceRecords(testHarness, 1, initialTime + 5); testHarness.processAll(); - expectedOutput.add(new StreamRecord<>("" + (initialTime + 5), TimestampAssigner.NO_TIMESTAMP)); + expectedOutput.add(new StreamRecord<>("" + (initialTime + 5), initialTime + 5)); expectedOutput.add(new Watermark(initialTime + 5)); // the source should go back to being idle immediately, but AbstractStreamOperatorV2 // should have updated it's watermark by then. @@ -534,7 +533,7 @@ public class MultipleInputStreamTaskTest { // make source active once again, emit a watermark and go idle again. addSourceRecords(testHarness, 1, initialTime + 10); - expectedOutput.add(new StreamRecord<>("" + (initialTime + 10), TimestampAssigner.NO_TIMESTAMP)); + expectedOutput.add(new StreamRecord<>("" + (initialTime + 10), initialTime + 10)); expectedOutput.add(StreamStatus.ACTIVE); expectedOutput.add(StreamStatus.IDLE); testHarness.processAll(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java index ca2ac10..5be45d4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.runtime.tasks; -import org.apache.flink.api.common.eventtime.TimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.connector.source.Boundedness; @@ -143,7 +142,7 @@ public class SourceOperatorStreamTaskTest { // Build expected output to verify the results Queue<Object> expectedOutput = new LinkedList<>(); - expectedRecords.forEach(r -> expectedOutput.offer(new StreamRecord<>(r, TimestampAssigner.NO_TIMESTAMP))); + expectedRecords.forEach(r -> expectedOutput.offer(new StreamRecord<>(r, r))); // Add barrier to the expected output. expectedOutput.add(new CheckpointBarrier(checkpointId, checkpointId, checkpointOptions)); diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingReaderContext.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingReaderContext.java index 1fda64a..50df6a3 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingReaderContext.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingReaderContext.java @@ -20,9 +20,8 @@ package org.apache.flink.connector.testutils.source.reader; import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.metrics.SourceMetricGroup; import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import java.util.ArrayList; import java.util.List; @@ -32,7 +31,7 @@ import java.util.List; */ public class TestingReaderContext implements SourceReaderContext { - private final UnregisteredMetricsGroup metrics = new UnregisteredMetricsGroup(); + private final TestingSourceMetricGroup metrics = new TestingSourceMetricGroup(); private final Configuration config; @@ -51,7 +50,7 @@ public class TestingReaderContext implements SourceReaderContext { // ------------------------------------------------------------------------ @Override - public MetricGroup metricGroup() { + public SourceMetricGroup metricGroup() { return metrics; } diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingSourceMetricGroup.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingSourceMetricGroup.java new file mode 100644 index 0000000..3dc33c2 --- /dev/null +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingSourceMetricGroup.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.testutils.source.reader; + +import org.apache.flink.api.connector.source.metrics.SourceMetricGroup; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; + +/** + * A testing implementation of {@link SourceMetricGroup}. + */ +public class TestingSourceMetricGroup extends UnregisteredMetricsGroup implements SourceMetricGroup { + private final Counter numRecordsInCounter = new SimpleCounter(); + private final Counter numBytesInCounter = new SimpleCounter(); + private final Counter numRecordsInErrorsCounter = new SimpleCounter(); + + @Override + public Counter getNumRecordsInCounter() { + return numRecordsInCounter; + } + + @Override + public Counter getNumBytesInCounter() { + return numBytesInCounter; + } + + @Override + public Counter getNumRecordsInErrorsCounter() { + return numRecordsInErrorsCounter; + } +}
