This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit dfeae1b652bf268aadce6df1e98c377e98e56655 Author: Arvid Heise <[email protected]> AuthorDate: Wed Jun 23 17:08:16 2021 +0200 [FLINK-20888][runtime] Close outputs in OperatorChain. OperatorChain creates the outputs and owns them, so that it should also close them. Specific operators should not close the outputs. Also, ChainingOutput should never close the chained operator; it's not owning the operator. --- .../source/ContinuousFileReaderOperator.java | 1 - .../streaming/runtime/tasks/ChainingOutput.java | 15 ++---- .../streaming/runtime/tasks/OperatorChain.java | 32 +++++++----- .../flink/streaming/runtime/tasks/StreamTask.java | 2 +- .../tasks/WatermarkGaugeExposingOutput.java | 4 +- .../StreamSourceOperatorLatencyMetricsTest.java | 2 +- flink-tests/pom.xml | 9 +++- .../source/ContinuousFileReaderOperatorITCase.java | 61 ++++++++++++++++++++++ 8 files changed, 97 insertions(+), 29 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java index f6149e9..0df0be2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java @@ -499,7 +499,6 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit> RunnableWithException[] runClose = { () -> sourceContext.close(), - () -> output.close(), () -> format.close(), () -> { if (this.format instanceof RichInputFormat) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java index 64f1cce..49d541e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java @@ -43,20 +43,17 @@ class ChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> protected final Counter numRecordsIn; protected final WatermarkGauge watermarkGauge = new WatermarkGauge(); @Nullable protected final OutputTag<T> outputTag; - @Nullable protected final AutoCloseable closeable; protected StreamStatus announcedStatus = StreamStatus.ACTIVE; public ChainingOutput(OneInputStreamOperator<T, ?> operator, @Nullable OutputTag<T> outputTag) { - this(operator, (OperatorMetricGroup) operator.getMetricGroup(), outputTag, operator::close); + this(operator, (OperatorMetricGroup) operator.getMetricGroup(), outputTag); } public ChainingOutput( Input<T> input, OperatorMetricGroup operatorMetricGroup, - @Nullable OutputTag<T> outputTag, - @Nullable AutoCloseable closeable) { + @Nullable OutputTag<T> outputTag) { this.input = input; - this.closeable = closeable; { Counter tmpNumRecordsIn; @@ -129,13 +126,7 @@ class ChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> @Override public void close() { - try { - if (closeable != null) { - closeable.close(); - } - } catch (Exception e) { - throw new ExceptionInChainedOperatorException(e); - } + // nothing is owned by ChainingOutput and should be closed, see FLINK-20888 } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index 25270bb..5bfef4e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -54,11 +54,14 @@ import org.apache.flink.util.FlinkException; import org.apache.flink.util.OutputTag; import org.apache.flink.util.SerializedValue; +import org.apache.flink.shaded.guava18.com.google.common.io.Closer; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -85,7 +88,8 @@ import static org.apache.flink.util.Preconditions.checkState; * operator. */ @Internal -public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements BoundedMultiInput { +public class OperatorChain<OUT, OP extends StreamOperator<OUT>> + implements BoundedMultiInput, Closeable { private static final Logger LOG = LoggerFactory.getLogger(OperatorChain.class); @@ -128,6 +132,8 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Bound private boolean ignoreEndOfInput; + private final Closer closer = Closer.create(); + public OperatorChain( StreamTask<OUT, OP> containingTask, RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate) { @@ -353,7 +359,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Bound * Chained sources are closed when {@link * org.apache.flink.streaming.runtime.io.StreamTaskSourceInput} are being closed. */ - return new ChainingOutput<>(input, metricGroup, outputTag, null); + return closer.register(new ChainingOutput(input, metricGroup, outputTag)); } public OperatorEventDispatcher getOperatorEventDispatcher() { @@ -497,10 +503,8 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Bound * * <p>This method should never fail. */ - public void releaseOutputs() { - for (RecordWriterOutput<?> streamOutput : streamOutputs) { - streamOutput.close(); - } + public void close() throws IOException { + closer.close(); } @Nullable @@ -564,9 +568,9 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Bound // If the chaining output does not copy we need to copy in the broadcast output, // otherwise multi-chaining would not work correctly. if (containingTask.getExecutionConfig().isObjectReuseEnabled()) { - return new CopyingBroadcastingOutputCollector<>(asArray); + return closer.register(new CopyingBroadcastingOutputCollector<>(asArray)); } else { - return new BroadcastingOutputCollector<>(asArray); + return closer.register(new BroadcastingOutputCollector<>(asArray)); } } } @@ -669,7 +673,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Bound MetricNames.IO_CURRENT_INPUT_WATERMARK, currentOperatorOutput.getWatermarkGauge()::getValue); - return currentOperatorOutput; + return closer.register(currentOperatorOutput); } private RecordWriterOutput<OUT> createStreamOutput( @@ -679,7 +683,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Bound Environment taskEnvironment) { OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, return null if not sideOutput - TypeSerializer outSerializer = null; + TypeSerializer outSerializer; if (edge.getOutputTag() != null) { // side output @@ -694,8 +698,12 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Bound taskEnvironment.getUserCodeClassLoader().asClassLoader()); } - return new RecordWriterOutput<>( - recordWriter, outSerializer, sideOutputTag, edge.supportsUnalignedCheckpoints()); + return closer.register( + new RecordWriterOutput<OUT>( + recordWriter, + outSerializer, + sideOutputTag, + edge.supportsUnalignedCheckpoints())); } /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index fb10349..16fe10a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -832,7 +832,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab if (operatorChain != null) { // beware: without synchronization, #performCheckpoint() may run in // parallel and this call is not thread-safe - actionExecutor.run(() -> operatorChain.releaseOutputs()); + actionExecutor.run(() -> operatorChain.close()); } else { // failed to allocate operatorChain, clean up record writers recordWriter.close(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/WatermarkGaugeExposingOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/WatermarkGaugeExposingOutput.java index 14c6468..77e56f9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/WatermarkGaugeExposingOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/WatermarkGaugeExposingOutput.java @@ -21,11 +21,13 @@ import org.apache.flink.metrics.Gauge; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.runtime.metrics.WatermarkGauge; +import java.io.Closeable; + /** * An {@link Output} that measures the last emitted watermark with a {@link WatermarkGauge}. * * @param <T> The type of the elements that can be emitted. */ -public interface WatermarkGaugeExposingOutput<T> extends Output<T> { +public interface WatermarkGaugeExposingOutput<T> extends Output<T>, Closeable { Gauge<Long> getWatermarkGauge(); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java index b89a92d..597d7fb 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java @@ -192,7 +192,7 @@ public class StreamSourceOperatorLatencyMetricsTest extends TestLogger { operator.run(new Object(), new CollectorOutput<>(output), operatorChain); operator.close(); } finally { - operatorChain.releaseOutputs(); + operatorChain.close(); } assertEquals( diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml index 0c51b7c..1d771bd 100644 --- a/flink-tests/pom.xml +++ b/flink-tests/pom.xml @@ -46,6 +46,13 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-files</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${project.version}</version> <scope>test</scope> @@ -259,7 +266,7 @@ under the License. <artifactId>reflections</artifactId> </dependency> - </dependencies> + </dependencies> <build> <plugins> diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/functions/source/ContinuousFileReaderOperatorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/functions/source/ContinuousFileReaderOperatorITCase.java new file mode 100644 index 0000000..dc15abf --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/functions/source/ContinuousFileReaderOperatorITCase.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.streaming.api.functions.source; + +import org.apache.flink.api.common.serialization.SimpleStringEncoder; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; + +import org.apache.commons.io.FileUtils; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.nio.charset.StandardCharsets; + +/** + * ITCases for {@link org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator}. + */ +public class ContinuousFileReaderOperatorITCase { + @Rule public TemporaryFolder temp = new TemporaryFolder(); + + /** Tests https://issues.apache.org/jira/browse/FLINK-20888. */ + @Test + public void testChainedOperatorsAreNotPrematurelyClosed() throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + File input = temp.newFile("input"); + FileUtils.write(input, "test", StandardCharsets.UTF_8); + DataStream<String> stream = env.readTextFile(input.getAbsolutePath()); + final FileSink<String> sink = + FileSink.forRowFormat( + new Path(temp.newFolder("output").getAbsolutePath()), + new SimpleStringEncoder<String>()) + .withOutputFileConfig(OutputFileConfig.builder().build()) + .withRollingPolicy( + DefaultRollingPolicy.builder().withMaxPartSize(1024 * 1024).build()) + .build(); + stream.sinkTo(sink); + env.execute("test"); + } +}
