This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 83a26d519261f3d7607a2e25ce77e5f2c10b11f2
Author: Arvid Heise <[email protected]>
AuthorDate: Wed Jun 23 17:08:16 2021 +0200

    [FLINK-20888][runtime] Close outputs in OperatorChain.
    
    OperatorChain creates the outputs and owns them, so that it should also 
close them. Specific operators should not close the outputs.
    
    Also, ChainingOutput should never close the chained operator; it's not 
owning the operator.
---
 .../source/ContinuousFileReaderOperator.java       |  1 -
 .../streaming/runtime/tasks/ChainingOutput.java    | 16 ++----
 .../streaming/runtime/tasks/OperatorChain.java     | 36 +++++++------
 .../flink/streaming/runtime/tasks/StreamTask.java  |  2 +-
 .../tasks/WatermarkGaugeExposingOutput.java        |  4 +-
 .../StreamSourceOperatorLatencyMetricsTest.java    |  2 +-
 flink-tests/pom.xml                                |  9 +++-
 .../source/ContinuousFileReaderOperatorITCase.java | 61 ++++++++++++++++++++++
 8 files changed, 97 insertions(+), 34 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index b26a9d4..8d19ace 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -500,7 +500,6 @@ public class ContinuousFileReaderOperator<OUT, T extends 
TimestampedInputSplit>
 
         RunnableWithException[] runClose = {
             () -> sourceContext.close(),
-            () -> output.close(),
             () -> format.close(),
             () -> {
                 if (this.format instanceof RichInputFormat) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
index 47e0a2c..e1fbcef 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
@@ -44,7 +44,6 @@ class ChainingOutput<T> implements 
WatermarkGaugeExposingOutput<StreamRecord<T>>
     protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
     protected final StreamStatusProvider streamStatusProvider;
     @Nullable protected final OutputTag<T> outputTag;
-    @Nullable protected final AutoCloseable closeable;
 
     public ChainingOutput(
             OneInputStreamOperator<T, ?> operator,
@@ -54,18 +53,15 @@ class ChainingOutput<T> implements 
WatermarkGaugeExposingOutput<StreamRecord<T>>
                 operator,
                 (OperatorMetricGroup) operator.getMetricGroup(),
                 streamStatusProvider,
-                outputTag,
-                operator::close);
+                outputTag);
     }
 
     public ChainingOutput(
             Input<T> input,
             OperatorMetricGroup operatorMetricGroup,
             StreamStatusProvider streamStatusProvider,
-            @Nullable OutputTag<T> outputTag,
-            @Nullable AutoCloseable closeable) {
+            @Nullable OutputTag<T> outputTag) {
         this.input = input;
-        this.closeable = closeable;
 
         {
             Counter tmpNumRecordsIn;
@@ -138,13 +134,7 @@ class ChainingOutput<T> implements 
WatermarkGaugeExposingOutput<StreamRecord<T>>
 
     @Override
     public void close() {
-        try {
-            if (closeable != null) {
-                closeable.close();
-            }
-        } catch (Exception e) {
-            throw new ExceptionInChainedOperatorException(e);
-        }
+        // nothing is owned by ChainingOutput and should be closed, see 
FLINK-20888
     }
 
     @Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 6bfdeec..d7bd22f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -56,11 +56,14 @@ import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.SerializedValue;
 
+import org.apache.flink.shaded.guava18.com.google.common.io.Closer;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -88,7 +91,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  */
 @Internal
 public class OperatorChain<OUT, OP extends StreamOperator<OUT>>
-        implements StreamStatusMaintainer, BoundedMultiInput {
+        implements StreamStatusMaintainer, BoundedMultiInput, Closeable {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(OperatorChain.class);
 
@@ -129,6 +132,8 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>>
      */
     private StreamStatus streamStatus = StreamStatus.ACTIVE;
 
+    private final Closer closer = Closer.create();
+
     public OperatorChain(
             StreamTask<OUT, OP> containingTask,
             RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> 
recordWriterDelegate) {
@@ -354,7 +359,7 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>>
          * Chained sources are closed when {@link
          * org.apache.flink.streaming.runtime.io.StreamTaskSourceInput} are 
being closed.
          */
-        return new ChainingOutput<>(input, metricGroup, this, outputTag, null);
+        return closer.register(new ChainingOutput(input, metricGroup, this, 
outputTag));
     }
 
     @Override
@@ -514,10 +519,8 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>>
      *
      * <p>This method should never fail.
      */
-    public void releaseOutputs() {
-        for (RecordWriterOutput<?> streamOutput : streamOutputs) {
-            streamOutput.close();
-        }
+    public void close() throws IOException {
+        closer.close();
     }
 
     @Nullable
@@ -581,9 +584,9 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>>
             // If the chaining output does not copy we need to copy in the 
broadcast output,
             // otherwise multi-chaining would not work correctly.
             if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
-                return new CopyingBroadcastingOutputCollector<>(asArray, this);
+                return closer.register(new 
CopyingBroadcastingOutputCollector<>(asArray, this));
             } else {
-                return new BroadcastingOutputCollector<>(asArray, this);
+                return closer.register(new 
BroadcastingOutputCollector<>(asArray, this));
             }
         }
     }
@@ -687,7 +690,7 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>>
                         MetricNames.IO_CURRENT_INPUT_WATERMARK,
                         currentOperatorOutput.getWatermarkGauge()::getValue);
 
-        return currentOperatorOutput;
+        return closer.register(currentOperatorOutput);
     }
 
     private RecordWriterOutput<OUT> createStreamOutput(
@@ -697,7 +700,7 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>>
             Environment taskEnvironment) {
         OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, return 
null if not sideOutput
 
-        TypeSerializer outSerializer = null;
+        TypeSerializer outSerializer;
 
         if (edge.getOutputTag() != null) {
             // side output
@@ -712,12 +715,13 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>>
                             
taskEnvironment.getUserCodeClassLoader().asClassLoader());
         }
 
-        return new RecordWriterOutput<>(
-                recordWriter,
-                outSerializer,
-                sideOutputTag,
-                this,
-                edge.supportsUnalignedCheckpoints());
+        return closer.register(
+                new RecordWriterOutput<OUT>(
+                        recordWriter,
+                        outSerializer,
+                        sideOutputTag,
+                        this,
+                        edge.supportsUnalignedCheckpoints()));
     }
 
     /**
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 4a1a99b..1c30618 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -763,7 +763,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>> extends Ab
         if (operatorChain != null) {
             // beware: without synchronization, #performCheckpoint() may run in
             //         parallel and this call is not thread-safe
-            actionExecutor.run(() -> operatorChain.releaseOutputs());
+            actionExecutor.run(() -> operatorChain.close());
         } else {
             // failed to allocate operatorChain, clean up record writers
             recordWriter.close();
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/WatermarkGaugeExposingOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/WatermarkGaugeExposingOutput.java
index 14c6468..77e56f9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/WatermarkGaugeExposingOutput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/WatermarkGaugeExposingOutput.java
@@ -21,11 +21,13 @@ import org.apache.flink.metrics.Gauge;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 
+import java.io.Closeable;
+
 /**
  * An {@link Output} that measures the last emitted watermark with a {@link 
WatermarkGauge}.
  *
  * @param <T> The type of the elements that can be emitted.
  */
-public interface WatermarkGaugeExposingOutput<T> extends Output<T> {
+public interface WatermarkGaugeExposingOutput<T> extends Output<T>, Closeable {
     Gauge<Long> getWatermarkGauge();
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
index 8494dcf..adccaa1 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
@@ -197,7 +197,7 @@ public class StreamSourceOperatorLatencyMetricsTest extends 
TestLogger {
                     operatorChain);
             operator.close();
         } finally {
-            operatorChain.releaseOutputs();
+            operatorChain.close();
         }
 
         assertEquals(
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index ffe6687..b2e02ca 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -46,6 +46,13 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-files</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        <artifactId>flink-core</artifactId>
                        <version>${project.version}</version>
                        <scope>test</scope>
@@ -243,7 +250,7 @@ under the License.
                        <artifactId>reflections</artifactId>
                </dependency>
 
-       </dependencies>
+    </dependencies>
 
        <build>
                <plugins>
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/functions/source/ContinuousFileReaderOperatorITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/functions/source/ContinuousFileReaderOperatorITCase.java
new file mode 100644
index 0000000..dc15abf
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/functions/source/ContinuousFileReaderOperatorITCase.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.api.functions.source;
+
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * ITCases for {@link 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator}.
+ */
+public class ContinuousFileReaderOperatorITCase {
+    @Rule public TemporaryFolder temp = new TemporaryFolder();
+
+    /** Tests https://issues.apache.org/jira/browse/FLINK-20888. */
+    @Test
+    public void testChainedOperatorsAreNotPrematurelyClosed() throws Exception 
{
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        File input = temp.newFile("input");
+        FileUtils.write(input, "test", StandardCharsets.UTF_8);
+        DataStream<String> stream = env.readTextFile(input.getAbsolutePath());
+        final FileSink<String> sink =
+                FileSink.forRowFormat(
+                                new 
Path(temp.newFolder("output").getAbsolutePath()),
+                                new SimpleStringEncoder<String>())
+                        
.withOutputFileConfig(OutputFileConfig.builder().build())
+                        .withRollingPolicy(
+                                
DefaultRollingPolicy.builder().withMaxPartSize(1024 * 1024).build())
+                        .build();
+        stream.sinkTo(sink);
+        env.execute("test");
+    }
+}

Reply via email to