This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c1398f057c452529e7d2e8f494bff1bf900b2f32 Author: Piotr Nowojski <piotr.nowoj...@gmail.com> AuthorDate: Wed Mar 18 17:07:15 2020 +0100 [FLINK-16316][operators] Move inner CountingClass class out from AbstractStreamOperator --- .../api/operators/AbstractStreamOperator.java | 44 --------------- .../streaming/api/operators/CountingOutput.java | 65 ++++++++++++++++++++++ 2 files changed, 65 insertions(+), 44 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index c2604b2..6507ce9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -29,7 +29,6 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; import org.apache.flink.core.fs.CloseableRegistry; -import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; @@ -52,7 +51,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.util.LatencyStats; -import org.apache.flink.util.OutputTag; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -513,48 +511,6 @@ public abstract class AbstractStreamOperator<OUT> this.output.emitLatencyMarker(marker); } - // ----------------------- Helper classes ----------------------- - - /** - * Wrapping {@link Output} that updates metrics on the number of emitted elements. - */ - public static class CountingOutput<OUT> implements Output<StreamRecord<OUT>> { - private final Output<StreamRecord<OUT>> output; - private final Counter numRecordsOut; - - public CountingOutput(Output<StreamRecord<OUT>> output, Counter counter) { - this.output = output; - this.numRecordsOut = counter; - } - - @Override - public void emitWatermark(Watermark mark) { - output.emitWatermark(mark); - } - - @Override - public void emitLatencyMarker(LatencyMarker latencyMarker) { - output.emitLatencyMarker(latencyMarker); - } - - @Override - public void collect(StreamRecord<OUT> record) { - numRecordsOut.inc(); - output.collect(record); - } - - @Override - public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) { - numRecordsOut.inc(); - output.collect(outputTag, record); - } - - @Override - public void close() { - output.close(); - } - } - // ------------------------------------------------------------------------ // Watermark handling // ------------------------------------------------------------------------ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CountingOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CountingOutput.java new file mode 100644 index 0000000..79acabb --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CountingOutput.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.OutputTag; + +/** + * Wrapping {@link Output} that updates metrics on the number of emitted elements. + */ +public class CountingOutput<OUT> implements Output<StreamRecord<OUT>> { + private final Output<StreamRecord<OUT>> output; + private final Counter numRecordsOut; + + public CountingOutput(Output<StreamRecord<OUT>> output, Counter counter) { + this.output = output; + this.numRecordsOut = counter; + } + + @Override + public void emitWatermark(Watermark mark) { + output.emitWatermark(mark); + } + + @Override + public void emitLatencyMarker(LatencyMarker latencyMarker) { + output.emitLatencyMarker(latencyMarker); + } + + @Override + public void collect(StreamRecord<OUT> record) { + numRecordsOut.inc(); + output.collect(record); + } + + @Override + public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) { + numRecordsOut.inc(); + output.collect(outputTag, record); + } + + @Override + public void close() { + output.close(); + } +}