This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 541f43026203b56c0501a33ed54271270ac3e085
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Fri Jun 25 15:32:56 2021 +0200

    [FLINK-22972][datastream] Remove StreamOperator#dispose in favour of close 
and finish
    
    This commit cleans up StreamOperator API in regards to the termination 
phase and introduces a clean finish() method for flushing all records without 
releasing resources.
    
    The StreamOperator#close  method which is supposed to flush all records, 
but at the same time, currently, it closes all resources, including connections 
to external systems. We need separate methods for flushing and closing 
resources because we might need the connections when performing the final 
checkpoint, once all records are flushed. Moreover, the logic for closing 
resources is duplicated in the StreamOperator#dispose  method.
    
    This closes #16351
---
 .../flink/state/api/output/BoundedStreamTask.java  |   2 +-
 .../operators/StateBootstrapWrapperOperator.java   |   8 +-
 .../flink/state/api/output/SnapshotUtilsTest.java  |   8 +-
 .../python/AbstractPythonFunctionOperator.java     |  20 ++--
 .../PythonTimestampsAndWatermarksOperator.java     |   4 +-
 ...stractArrowPythonAggregateFunctionOperator.java |   4 +-
 ...tBatchArrowPythonAggregateFunctionOperator.java |   4 +-
 .../RowDataArrowPythonScalarFunctionOperator.java  |  14 +--
 .../source/ContinuousFileReaderOperator.java       | 104 ++++++++++-----------
 .../api/operators/AbstractStreamOperator.java      |  23 +----
 .../api/operators/AbstractStreamOperatorV2.java    |  23 +----
 .../api/operators/AbstractUdfStreamOperator.java   |  13 ---
 .../streaming/api/operators/SourceOperator.java    |  18 +---
 .../streaming/api/operators/StreamOperator.java    |  26 +++---
 .../streaming/api/operators/StreamSource.java      |  22 +++--
 .../api/operators/collect/CollectSinkOperator.java |   4 +-
 .../runtime/operators/GenericWriteAheadSink.java   |   1 +
 .../operators/TimestampsAndWatermarksOperator.java |   4 +-
 .../windowing/EvictingWindowOperator.java          |   6 --
 .../operators/windowing/WindowOperator.java        |   9 --
 .../streaming/runtime/tasks/OperatorChain.java     |   6 +-
 .../runtime/tasks/StreamOperatorWrapper.java       |  60 ++++++------
 .../flink/streaming/runtime/tasks/StreamTask.java  |  33 ++++---
 .../AbstractUdfStreamOperatorLifecycleTest.java    |  23 ++---
 .../api/operators/SourceOperatorTest.java          |  12 ---
 .../ContinuousFileProcessingRescalingTest.java     |   2 +-
 .../StreamSourceOperatorLatencyMetricsTest.java    |   2 +-
 .../runtime/tasks/MultipleInputStreamTaskTest.java |  43 ++-------
 .../runtime/tasks/OneInputStreamTaskTest.java      |   8 +-
 .../runtime/tasks/SourceStreamTaskTest.java        |  15 ++-
 .../runtime/tasks/StreamOperatorWrapperTest.java   |  26 +++---
 .../streaming/runtime/tasks/StreamTaskTest.java    |  48 +++++-----
 .../tasks/SubtaskCheckpointCoordinatorTest.java    |   4 +-
 .../tasks/TestBoundedOneInputStreamOperator.java   |  10 +-
 .../runtime/tasks/TestBoundedTwoInputOperator.java |  11 ++-
 .../runtime/tasks/TwoInputStreamTaskTest.java      |   6 +-
 .../util/AbstractStreamOperatorTestHarness.java    |   6 +-
 .../util/TestBoundedMultipleInputOperator.java     |  74 ---------------
 .../filesystem/stream/AbstractStreamingWriter.java |   4 +-
 .../runtime/operators/TableStreamOperator.java     |  16 ----
 .../window/LocalSlicingWindowAggOperator.java      |  15 ---
 .../bundle/AbstractMapBundleOperator.java          |  38 ++++----
 .../temporal/TemporalProcessTimeJoinOperator.java  |   1 +
 .../join/temporal/TemporalRowTimeJoinOperator.java |   1 +
 .../operators/join/window/WindowJoinOperator.java  |  17 ----
 .../MultipleInputStreamOperatorBase.java           |  18 ++--
 .../runtime/operators/sort/StreamSortOperator.java |   6 +-
 .../runtime/operators/window/WindowOperator.java   |  23 -----
 .../window/slicing/SlicingWindowOperator.java      |  15 ---
 .../ProcTimeMiniBatchAssignerOperator.java         |   5 -
 .../RowTimeMiniBatchAssginerOperator.java          |   4 +-
 .../wmassigners/WatermarkAssignerOperator.java     |  17 +---
 .../BatchMultipleInputStreamOperatorTest.java      |  31 ------
 .../TestingOneInputStreamOperator.java             |  10 --
 .../TestingTwoInputStreamOperator.java             |  10 --
 .../test/streaming/runtime/TimestampITCase.java    |   4 +-
 56 files changed, 314 insertions(+), 627 deletions(-)

diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
index ee30d4f..257bc70 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
@@ -106,6 +106,7 @@ class BoundedStreamTask<IN, OUT, OP extends 
OneInputStreamOperator<IN, OUT> & Bo
             mainOperator.processElement(streamRecord);
         } else {
             mainOperator.endInput();
+            mainOperator.finish();
             controller.suspendDefaultAction();
             mailboxProcessor.suspend();
         }
@@ -117,7 +118,6 @@ class BoundedStreamTask<IN, OUT, OP extends 
OneInputStreamOperator<IN, OUT> & Bo
     @Override
     protected void cleanup() throws Exception {
         mainOperator.close();
-        mainOperator.dispose();
     }
 
     private static class CollectorWrapper<OUT> implements 
Output<StreamRecord<OUT>> {
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java
index 245a7d4..92eb6d3 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java
@@ -105,13 +105,13 @@ public final class StateBootstrapWrapperOperator<
     }
 
     @Override
-    public void close() throws Exception {
-        operator.close();
+    public void finish() throws Exception {
+        operator.finish();
     }
 
     @Override
-    public void dispose() throws Exception {
-        operator.dispose();
+    public void close() throws Exception {
+        operator.close();
     }
 
     @Override
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
index 1178654..8430888 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
@@ -74,13 +74,13 @@ public class SnapshotUtilsTest {
         }
 
         @Override
-        public void close() throws Exception {
-            ACTUAL_ORDER_TRACKING.add("close");
+        public void finish() throws Exception {
+            ACTUAL_ORDER_TRACKING.add("finish");
         }
 
         @Override
-        public void dispose() throws Exception {
-            ACTUAL_ORDER_TRACKING.add("dispose");
+        public void close() throws Exception {
+            ACTUAL_ORDER_TRACKING.add("close");
         }
 
         @Override
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
index 387c14f..7abb6c1 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
@@ -139,22 +139,16 @@ public abstract class AbstractPythonFunctionOperator<OUT> 
extends AbstractStream
     }
 
     @Override
-    public void close() throws Exception {
+    public void finish() throws Exception {
         try {
             invokeFinishBundle();
         } finally {
-            super.close();
-
-            try {
-                cleanUpLeakingClasses(this.getClass().getClassLoader());
-            } catch (Throwable t) {
-                LOG.warn("Failed to clean up the leaking objects.", t);
-            }
+            super.finish();
         }
     }
 
     @Override
-    public void dispose() throws Exception {
+    public void close() throws Exception {
         try {
             if (checkFinishBundleTimer != null) {
                 checkFinishBundleTimer.cancel(true);
@@ -165,7 +159,13 @@ public abstract class AbstractPythonFunctionOperator<OUT> 
extends AbstractStream
                 pythonFunctionRunner = null;
             }
         } finally {
-            super.dispose();
+            super.close();
+
+            try {
+                cleanUpLeakingClasses(this.getClass().getClassLoader());
+            } catch (Throwable t) {
+                LOG.warn("Failed to clean up the leaking objects.", t);
+            }
         }
     }
 
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonTimestampsAndWatermarksOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonTimestampsAndWatermarksOperator.java
index 996aa74..57aea3d 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonTimestampsAndWatermarksOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonTimestampsAndWatermarksOperator.java
@@ -181,8 +181,8 @@ public class PythonTimestampsAndWatermarksOperator<IN>
     }
 
     @Override
-    public void close() throws Exception {
-        super.close();
+    public void finish() throws Exception {
+        super.finish();
         watermarkGenerator.onPeriodicEmit(watermarkOutput);
     }
 }
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java
index b6fe695..a7c63cf 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java
@@ -107,8 +107,8 @@ public abstract class 
AbstractArrowPythonAggregateFunctionOperator
     }
 
     @Override
-    public void dispose() throws Exception {
-        super.dispose();
+    public void close() throws Exception {
+        super.close();
         if (arrowSerializer != null) {
             arrowSerializer.close();
             arrowSerializer = null;
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/AbstractBatchArrowPythonAggregateFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/AbstractBatchArrowPythonAggregateFunctionOperator.java
index ecbf2a9..4442cbd 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/AbstractBatchArrowPythonAggregateFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/AbstractBatchArrowPythonAggregateFunctionOperator.java
@@ -100,9 +100,9 @@ abstract class 
AbstractBatchArrowPythonAggregateFunctionOperator
     }
 
     @Override
-    public void close() throws Exception {
+    public void finish() throws Exception {
         invokeCurrentBatch();
-        super.close();
+        super.finish();
     }
 
     protected abstract void invokeCurrentBatch() throws Exception;
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/RowDataArrowPythonScalarFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/RowDataArrowPythonScalarFunctionOperator.java
index b5193b5..cc2eca4 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/RowDataArrowPythonScalarFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/RowDataArrowPythonScalarFunctionOperator.java
@@ -87,18 +87,18 @@ public class RowDataArrowPythonScalarFunctionOperator
     }
 
     @Override
-    public void dispose() throws Exception {
-        super.dispose();
-        if (arrowSerializer != null) {
-            arrowSerializer.close();
-            arrowSerializer = null;
-        }
+    public void finish() throws Exception {
+        invokeCurrentBatch();
+        super.finish();
     }
 
     @Override
     public void close() throws Exception {
-        invokeCurrentBatch();
         super.close();
+        if (arrowSerializer != null) {
+            arrowSerializer.close();
+            arrowSerializer = null;
+        }
     }
 
     @Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 0df0be2..5c29092 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -82,8 +82,8 @@ import static org.apache.flink.util.Preconditions.checkState;
  *
  * <ol>
  *   <li>if {@link ReaderState#IDLE IDLE} then close immediately
- *   <li>otherwise switch to {@link ReaderState#CLOSING CLOSING}, call {@link
- *       MailboxExecutor#yield() yield} in a loop until state is {@link 
ReaderState#CLOSED CLOSED}
+ *   <li>otherwise switch to {@link ReaderState#FINISHING CLOSING}, call {@link
+ *       MailboxExecutor#yield() yield} in a loop until state is {@link 
ReaderState#FINISHED CLOSED}
  *   <li>{@link MailboxExecutor#yield() yield()} causes remaining records (and 
splits) to be
  *       processed in the same way as above
  * </ol>
@@ -139,7 +139,7 @@ public class ContinuousFileReaderOperator<OUT, T extends 
TimestampedInputSplit>
             }
         },
         /**
-         * No further processing can be done; only state disposal transition 
to {@link #CLOSED}
+         * No further processing can be done; only state disposal transition 
to {@link #FINISHED}
          * allowed.
          */
         FAILED {
@@ -153,7 +153,7 @@ public class ContinuousFileReaderOperator<OUT, T extends 
TimestampedInputSplit>
          * {@link #close()} was called but unprocessed data (records and 
splits) remains and needs
          * to be processed. {@link #close()} caller is blocked.
          */
-        CLOSING {
+        FINISHING {
             @Override
             public <T extends TimestampedInputSplit> boolean 
prepareToProcessRecord(
                     ContinuousFileReaderOperator<?, T> op) throws IOException {
@@ -168,10 +168,10 @@ public class ContinuousFileReaderOperator<OUT, T extends 
TimestampedInputSplit>
                 // need one more mail to unblock possible yield() in close() 
method (todo: wait with
                 // timeout in yield)
                 op.enqueueProcessRecord();
-                op.switchState(CLOSED);
+                op.switchState(FINISHED);
             }
         },
-        CLOSED {
+        FINISHED {
             @Override
             public <T extends TimestampedInputSplit> boolean 
prepareToProcessRecord(
                     ContinuousFileReaderOperator<?, T> op) {
@@ -186,12 +186,12 @@ public class ContinuousFileReaderOperator<OUT, T extends 
TimestampedInputSplit>
 
         static {
             Map<ReaderState, Set<ReaderState>> tmpTransitions = new 
HashMap<>();
-            tmpTransitions.put(IDLE, EnumSet.of(OPENING, CLOSED, FAILED));
-            tmpTransitions.put(OPENING, EnumSet.of(READING, CLOSING, FAILED));
-            tmpTransitions.put(READING, EnumSet.of(IDLE, OPENING, CLOSING, 
FAILED));
-            tmpTransitions.put(CLOSING, EnumSet.of(CLOSED, FAILED));
-            tmpTransitions.put(FAILED, EnumSet.of(CLOSED));
-            tmpTransitions.put(CLOSED, EnumSet.noneOf(ReaderState.class));
+            tmpTransitions.put(IDLE, EnumSet.of(OPENING, FINISHED, FAILED));
+            tmpTransitions.put(OPENING, EnumSet.of(READING, FINISHING, 
FAILED));
+            tmpTransitions.put(READING, EnumSet.of(IDLE, OPENING, FINISHING, 
FAILED));
+            tmpTransitions.put(FINISHING, EnumSet.of(FINISHED, FAILED));
+            tmpTransitions.put(FAILED, EnumSet.of(FINISHED));
+            tmpTransitions.put(FINISHED, EnumSet.noneOf(ReaderState.class));
             VALID_TRANSITIONS = new EnumMap<>(tmpTransitions);
         }
 
@@ -200,7 +200,7 @@ public class ContinuousFileReaderOperator<OUT, T extends 
TimestampedInputSplit>
         }
 
         public final boolean isTerminal() {
-            return this == CLOSED;
+            return this == FINISHED;
         }
 
         public boolean canSwitchTo(ReaderState next) {
@@ -302,7 +302,7 @@ public class ContinuousFileReaderOperator<OUT, T extends 
TimestampedInputSplit>
 
         this.state = ReaderState.IDLE;
         if (this.format instanceof RichInputFormat) {
-            ((RichInputFormat) 
this.format).setRuntimeContext(getRuntimeContext());
+            ((RichInputFormat<?, ?>) 
this.format).setRuntimeContext(getRuntimeContext());
         }
         this.format.configure(new Configuration());
 
@@ -380,7 +380,7 @@ public class ContinuousFileReaderOperator<OUT, T extends 
TimestampedInputSplit>
 
     private void readAndCollectRecord() throws IOException {
         Preconditions.checkState(
-                state == ReaderState.READING || state == ReaderState.CLOSING,
+                state == ReaderState.READING || state == ReaderState.FINISHING,
                 "can't process record in state %s",
                 state);
         if (format.reachedEnd()) {
@@ -394,14 +394,14 @@ public class ContinuousFileReaderOperator<OUT, T extends 
TimestampedInputSplit>
 
     private void loadSplit(T split) throws IOException {
         Preconditions.checkState(
-                state != ReaderState.READING && state != ReaderState.CLOSED,
+                state != ReaderState.READING && state != ReaderState.FINISHED,
                 "can't load split in state %s",
                 state);
         Preconditions.checkNotNull(split, "split is null");
         LOG.debug("load split: {}", split);
         currentSplit = split;
         if (this.format instanceof RichInputFormat) {
-            ((RichInputFormat) this.format).openInputFormat();
+            ((RichInputFormat<?, ?>) this.format).openInputFormat();
         }
         if (format instanceof CheckpointableInputFormat && 
currentSplit.getSplitState() != null) {
             // recovering after a node failure with an input
@@ -436,14 +436,38 @@ public class ContinuousFileReaderOperator<OUT, T extends 
TimestampedInputSplit>
     }
 
     @Override
-    public void dispose() throws Exception {
+    public void finish() throws Exception {
+        LOG.debug("finishing");
+        super.finish();
+
+        switch (state) {
+            case IDLE:
+                switchState(ReaderState.FINISHED);
+                break;
+            case FINISHED:
+                LOG.warn("operator is already closed, doing nothing");
+                return;
+            default:
+                switchState(ReaderState.FINISHING);
+                while (!state.isTerminal()) {
+                    executor.yield();
+                }
+        }
+
+        try {
+            sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
+        } catch (Exception e) {
+            LOG.warn("unable to emit watermark while closing", e);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
         Exception e = null;
-        if (state != ReaderState.CLOSED) {
-            try {
-                cleanUp();
-            } catch (Exception ex) {
-                e = ex;
-            }
+        try {
+            cleanUp();
+        } catch (Exception ex) {
+            e = ex;
         }
         {
             checkpointedState = null;
@@ -457,7 +481,7 @@ public class ContinuousFileReaderOperator<OUT, T extends 
TimestampedInputSplit>
             splits = null;
         }
         try {
-            super.dispose();
+            super.close();
         } catch (Exception ex) {
             e = ExceptionUtils.firstOrSuppressed(ex, e);
         }
@@ -466,34 +490,6 @@ public class ContinuousFileReaderOperator<OUT, T extends 
TimestampedInputSplit>
         }
     }
 
-    @Override
-    public void close() throws Exception {
-        LOG.debug("closing");
-        super.close();
-
-        switch (state) {
-            case IDLE:
-                switchState(ReaderState.CLOSED);
-                break;
-            case CLOSED:
-                LOG.warn("operator is already closed, doing nothing");
-                return;
-            default:
-                switchState(ReaderState.CLOSING);
-                while (!state.isTerminal()) {
-                    executor.yield();
-                }
-        }
-
-        try {
-            sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
-        } catch (Exception e) {
-            LOG.warn("unable to emit watermark while closing", e);
-        }
-
-        cleanUp();
-    }
-
     private void cleanUp() throws Exception {
         LOG.debug("cleanup, state={}", state);
 
@@ -502,7 +498,7 @@ public class ContinuousFileReaderOperator<OUT, T extends 
TimestampedInputSplit>
             () -> format.close(),
             () -> {
                 if (this.format instanceof RichInputFormat) {
-                    ((RichInputFormat) this.format).closeInputFormat();
+                    ((RichInputFormat<?, ?>) this.format).closeInputFormat();
                 }
             }
         };
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 23da8de..15faaf7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -322,30 +322,11 @@ public abstract class AbstractStreamOperator<OUT>
     @Override
     public void open() throws Exception {}
 
-    /**
-     * This method is called after all records have been added to the 
operators via the methods
-     * {@link OneInputStreamOperator#processElement(StreamRecord)}, or {@link
-     * TwoInputStreamOperator#processElement1(StreamRecord)} and {@link
-     * TwoInputStreamOperator#processElement2(StreamRecord)}.
-     *
-     * <p>The method is expected to flush all remaining buffered data. 
Exceptions during this
-     * flushing of buffered should be propagated, in order to cause the 
operation to be recognized
-     * asa failed, because the last data items are not processed properly.
-     *
-     * @throws Exception An exception in this method causes the operator to 
fail.
-     */
     @Override
-    public void close() throws Exception {}
+    public void finish() throws Exception {}
 
-    /**
-     * This method is called at the very end of the operator's life, both in 
the case of a
-     * successful completion of the operation, and in the case of a failure 
and canceling.
-     *
-     * <p>This method is expected to make a thorough effort to release all 
resources that the
-     * operator has acquired.
-     */
     @Override
-    public void dispose() throws Exception {
+    public void close() throws Exception {
         if (stateHandler != null) {
             stateHandler.dispose();
         }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
index 702b753..28b2c44 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
@@ -266,30 +266,11 @@ public abstract class AbstractStreamOperatorV2<OUT>
     @Override
     public void open() throws Exception {}
 
-    /**
-     * This method is called after all records have been added to the 
operators via the methods
-     * {@link OneInputStreamOperator#processElement(StreamRecord)}, or {@link
-     * TwoInputStreamOperator#processElement1(StreamRecord)} and {@link
-     * TwoInputStreamOperator#processElement2(StreamRecord)}.
-     *
-     * <p>The method is expected to flush all remaining buffered data. 
Exceptions during this
-     * flushing of buffered should be propagated, in order to cause the 
operation to be recognized
-     * asa failed, because the last data items are not processed properly.
-     *
-     * @throws Exception An exception in this method causes the operator to 
fail.
-     */
     @Override
-    public void close() throws Exception {}
+    public void finish() throws Exception {}
 
-    /**
-     * This method is called at the very end of the operator's life, both in 
the case of a
-     * successful completion of the operation, and in the case of a failure 
and canceling.
-     *
-     * <p>This method is expected to make a thorough effort to release all 
resources that the
-     * operator has acquired.
-     */
     @Override
-    public void dispose() throws Exception {
+    public void close() throws Exception {
         if (stateHandler != null) {
             stateHandler.dispose();
         }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 1ece8e7..d41eb95 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -53,9 +53,6 @@ public abstract class AbstractUdfStreamOperator<OUT, F 
extends Function>
     /** The user function. */
     protected final F userFunction;
 
-    /** Flag to prevent duplicate function.close() calls in close() and 
dispose(). */
-    private transient boolean functionsClosed = false;
-
     public AbstractUdfStreamOperator(F userFunction) {
         this.userFunction = requireNonNull(userFunction);
         checkUdfCheckpointingPreconditions();
@@ -105,19 +102,9 @@ public abstract class AbstractUdfStreamOperator<OUT, F 
extends Function>
     @Override
     public void close() throws Exception {
         super.close();
-        functionsClosed = true;
         FunctionUtils.closeFunction(userFunction);
     }
 
-    @Override
-    public void dispose() throws Exception {
-        super.dispose();
-        if (!functionsClosed) {
-            functionsClosed = true;
-            FunctionUtils.closeFunction(userFunction);
-        }
-    }
-
     // ------------------------------------------------------------------------
     //  checkpointing and recovery
     // ------------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 835ad5a..cc2fc60 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -127,9 +127,6 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
      */
     private TimestampsAndWatermarks<OUT> eventTimeLogic;
 
-    /** Indicating whether the source operator has been closed. */
-    private boolean closed;
-
     public SourceOperator(
             FunctionWithException<SourceReaderContext, SourceReader<OUT, 
SplitT>, Exception>
                     readerFactory,
@@ -263,24 +260,19 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
     }
 
     @Override
-    public void close() throws Exception {
+    public void finish() throws Exception {
         if (eventTimeLogic != null) {
             eventTimeLogic.stopPeriodicWatermarkEmits();
         }
-        if (sourceReader != null) {
-            sourceReader.close();
-        }
-        closed = true;
-        super.close();
+        super.finish();
     }
 
     @Override
-    public void dispose() throws Exception {
-        // We also need to close the source reader to make sure the resources
-        // are released if the task does not finish normally.
-        if (!closed && sourceReader != null) {
+    public void close() throws Exception {
+        if (sourceReader != null) {
             sourceReader.close();
         }
+        super.close();
     }
 
     @Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index c4a90be..7c522fa 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Disposable;
 
 import java.io.Serializable;
 
@@ -44,8 +43,7 @@ import java.io.Serializable;
  * @param <OUT> The output type of the operator
  */
 @PublicEvolving
-public interface StreamOperator<OUT>
-        extends CheckpointListener, KeyContext, Disposable, Serializable {
+public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, 
Serializable {
 
     // ------------------------------------------------------------------------
     //  life cycle
@@ -64,21 +62,21 @@ public interface StreamOperator<OUT>
     void open() throws Exception;
 
     /**
-     * This method is called after all records have been added to the 
operators via the methods
-     * {@link
-     * 
org.apache.flink.streaming.api.operators.OneInputStreamOperator#processElement(StreamRecord)},
-     * or {@link
-     * 
org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement1(StreamRecord)}
-     * and {@link
-     * 
org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement2(StreamRecord)}.
+     * This method is called at the end of data processing.
      *
      * <p>The method is expected to flush all remaining buffered data. 
Exceptions during this
      * flushing of buffered should be propagated, in order to cause the 
operation to be recognized
      * as failed, because the last data items are not processed properly.
      *
+     * <p><b>After this method is called, no more records can be produced for 
the downstream
+     * operators.</b>
+     *
+     * <p><b>NOTE:</b>This method does not need to close any resources. You 
should release external
+     * resources in the {@link #close()} method.
+     *
      * @throws java.lang.Exception An exception in this method causes the 
operator to fail.
      */
-    void close() throws Exception;
+    void finish() throws Exception;
 
     /**
      * This method is called at the very end of the operator's life, both in 
the case of a
@@ -86,9 +84,11 @@ public interface StreamOperator<OUT>
      *
      * <p>This method is expected to make a thorough effort to release all 
resources that the
      * operator has acquired.
+     *
+     * <p><b>NOTE:</b>It can not emit any records! If you need to emit records 
at the end of
+     * processing, do so in the {@link #finish()} method.
      */
-    @Override
-    void dispose() throws Exception;
+    void close() throws Exception;
 
     // ------------------------------------------------------------------------
     //  state snapshots
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 4957049..fef7f28 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -132,18 +132,20 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
     }
 
     @Override
+    public void finish() throws Exception {
+        super.finish();
+        if (!isCanceledOrStopped() && ctx != null) {
+            advanceToEndOfEventTime();
+        }
+    }
+
+    @Override
     public void close() throws Exception {
-        try {
-            super.close();
-            if (!isCanceledOrStopped() && ctx != null) {
-                advanceToEndOfEventTime();
-            }
-        } finally {
-            // make sure that the context is closed in any case
-            if (ctx != null) {
-                ctx.close();
-            }
+        // make sure that the context is closed in any case
+        if (ctx != null) {
+            ctx.close();
         }
+        super.close();
     }
 
     public void cancel() {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperator.java
index 5c84c9b..8b78bc3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperator.java
@@ -51,9 +51,9 @@ public class CollectSinkOperator<IN> extends StreamSink<IN> 
implements OperatorE
     }
 
     @Override
-    public void close() throws Exception {
+    public void finish() throws Exception {
         sinkFunction.accumulateFinalResults();
-        super.close();
+        super.finish();
     }
 
     public CompletableFuture<OperatorID> getOperatorIdFuture() {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
index 4d007bb..4627c7d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
@@ -137,6 +137,7 @@ public abstract class GenericWriteAheadSink<IN> extends 
AbstractStreamOperator<I
 
     public void close() throws Exception {
         committer.close();
+        super.close();
     }
 
     /**
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
index b2d3501..2e4a7be 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
@@ -127,8 +127,8 @@ public class TimestampsAndWatermarksOperator<T> extends 
AbstractStreamOperator<T
     }
 
     @Override
-    public void close() throws Exception {
-        super.close();
+    public void finish() throws Exception {
+        super.finish();
         watermarkGenerator.onPeriodicEmit(wmOutput);
     }
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 24fbd31..16e7aac 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -485,12 +485,6 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window>
         evictorContext = null;
     }
 
-    @Override
-    public void dispose() throws Exception {
-        super.dispose();
-        evictorContext = null;
-    }
-
     // ------------------------------------------------------------------------
     // Getters for testing
     // ------------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index f3f30a4..57bbfd1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -288,15 +288,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
     }
 
     @Override
-    public void dispose() throws Exception {
-        super.dispose();
-        timestampedCollector = null;
-        triggerContext = null;
-        processContext = null;
-        windowAssignerContext = null;
-    }
-
-    @Override
     public void processElement(StreamRecord<IN> element) throws Exception {
         final Collection<W> elementWindows =
                 windowAssigner.assignWindows(
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 5bfef4e..d1a35b8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -406,7 +406,7 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>>
     /**
      * Initialize state and open all operators in the chain from <b>tail to 
heads</b>, contrary to
      * {@link StreamOperator#close()} which happens <b>heads to tail</b> (see 
{@link
-     * #closeOperators(StreamTaskActionExecutor)}).
+     * #finishOperators(StreamTaskActionExecutor)}).
      */
     protected void initializeStateAndOpenOperators(
             StreamTaskStateInitializer streamTaskStateInitializer) throws 
Exception {
@@ -422,9 +422,9 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>>
      * operator in the chain, contrary to {@link StreamOperator#open()} which 
happens <b>tail to
      * heads</b> (see {@link 
#initializeStateAndOpenOperators(StreamTaskStateInitializer)}).
      */
-    protected void closeOperators(StreamTaskActionExecutor actionExecutor) 
throws Exception {
+    protected void finishOperators(StreamTaskActionExecutor actionExecutor) 
throws Exception {
         if (firstOperatorWrapper != null) {
-            firstOperatorWrapper.close(actionExecutor, ignoreEndOfInput);
+            firstOperatorWrapper.finish(actionExecutor, ignoreEndOfInput);
         }
     }
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java
index 401bc89..1b29fbc 100755
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java
@@ -39,7 +39,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * This class handles the close, endInput and other related logic of a {@link 
StreamOperator}. It
  * also automatically propagates the close operation to the next wrapper that 
the {@link #next}
  * points to, so we can use {@link #next} to link all operator wrappers in the 
operator chain and
- * close all operators only by calling the {@link 
#close(StreamTaskActionExecutor, boolean)} method
+ * close all operators only by calling the {@link 
#finish(StreamTaskActionExecutor, boolean)} method
  * of the header operator wrapper.
  */
 @Internal
@@ -120,7 +120,7 @@ public class StreamOperatorWrapper<OUT, OP extends 
StreamOperator<OUT>> {
      * MailboxExecutor#yield()} to take the mails of closing operator and 
running timers and run
      * them.
      */
-    public void close(StreamTaskActionExecutor actionExecutor, boolean 
isStoppingBySyncSavepoint)
+    public void finish(StreamTaskActionExecutor actionExecutor, boolean 
isStoppingBySyncSavepoint)
             throws Exception {
         if (!isHead && !isStoppingBySyncSavepoint) {
             // NOTE: This only do for the case where the operator is one-input 
operator. At present,
@@ -128,68 +128,63 @@ public class StreamOperatorWrapper<OUT, OP extends 
StreamOperator<OUT>> {
             actionExecutor.runThrowing(() -> endOperatorInput(1));
         }
 
-        quiesceTimeServiceAndCloseOperator(actionExecutor);
+        quiesceTimeServiceAndFinishOperator(actionExecutor);
 
         // propagate the close operation to the next wrapper
         if (next != null) {
-            next.close(actionExecutor, isStoppingBySyncSavepoint);
+            next.finish(actionExecutor, isStoppingBySyncSavepoint);
         }
     }
 
-    private void quiesceTimeServiceAndCloseOperator(StreamTaskActionExecutor 
actionExecutor)
+    private void quiesceTimeServiceAndFinishOperator(StreamTaskActionExecutor 
actionExecutor)
             throws InterruptedException, ExecutionException {
 
         // step 1. to ensure that there is no longer output triggered by the 
timers before invoking
-        // the "close()"
-        //         method of the operator, we quiesce the processing time 
service to prevent the
-        // pending timers
-        //         from firing, but wait the timers in running to finish
-        // step 2. invoke the "close()" method of the operator. executing the 
close operation must
-        // be deferred
-        //         to the mailbox to ensure that mails already in the mailbox 
are finished before
-        // closing the
-        //         operator
+        // the "finish()" method of the operator, we quiesce the processing 
time service to prevent
+        // the pending timers from firing, but wait the timers in running to 
finish
+        // step 2. invoke the "finish()" method of the operator. executing the 
close operation must
+        // be deferred to the mailbox to ensure that mails already in the 
mailbox are finished
+        // before closing the operator
         // step 3. send a closed mail to ensure that the mails that are from 
the operator and still
-        // in the mailbox
-        //         are completed before exiting the following mailbox 
processing loop
-        CompletableFuture<Void> closedFuture =
+        // in the mailbox are completed before exiting the following mailbox 
processing loop
+        CompletableFuture<Void> finishedFuture =
                 quiesceProcessingTimeService()
-                        .thenCompose(unused -> 
deferCloseOperatorToMailbox(actionExecutor))
-                        .thenCompose(unused -> sendClosedMail());
+                        .thenCompose(unused -> 
deferFinishOperatorToMailbox(actionExecutor))
+                        .thenCompose(unused -> sendFinishedMail());
 
         // run the mailbox processing loop until all operations are finished
-        while (!closedFuture.isDone()) {
+        while (!finishedFuture.isDone()) {
             while (mailboxExecutor.tryYield()) {}
 
             // we wait a little bit to avoid unnecessary CPU occupation due to 
empty loops,
             // such as when all mails of the operator have been processed but 
the closed future
             // has not been set to completed state
             try {
-                closedFuture.get(1, TimeUnit.MILLISECONDS);
+                finishedFuture.get(1, TimeUnit.MILLISECONDS);
             } catch (TimeoutException ex) {
                 // do nothing
             }
         }
 
         // expose the exception thrown when closing
-        closedFuture.get();
+        finishedFuture.get();
     }
 
-    private CompletableFuture<Void> deferCloseOperatorToMailbox(
+    private CompletableFuture<Void> deferFinishOperatorToMailbox(
             StreamTaskActionExecutor actionExecutor) {
-        final CompletableFuture<Void> closeOperatorFuture = new 
CompletableFuture<>();
+        final CompletableFuture<Void> finishOperatorFuture = new 
CompletableFuture<>();
 
         mailboxExecutor.execute(
                 () -> {
                     try {
-                        closeOperator(actionExecutor);
-                        closeOperatorFuture.complete(null);
+                        finishOperator(actionExecutor);
+                        finishOperatorFuture.complete(null);
                     } catch (Throwable t) {
-                        closeOperatorFuture.completeExceptionally(t);
+                        finishOperatorFuture.completeExceptionally(t);
                     }
                 },
                 "StreamOperatorWrapper#closeOperator for " + wrapped);
-        return closeOperatorFuture;
+        return finishOperatorFuture;
     }
 
     private CompletableFuture<Void> quiesceProcessingTimeService() {
@@ -198,19 +193,20 @@ public class StreamOperatorWrapper<OUT, OP extends 
StreamOperator<OUT>> {
                 .orElse(CompletableFuture.completedFuture(null));
     }
 
-    private CompletableFuture<Void> sendClosedMail() {
+    private CompletableFuture<Void> sendFinishedMail() {
         final CompletableFuture<Void> future = new CompletableFuture<>();
 
         mailboxExecutor.execute(
-                () -> future.complete(null), 
"StreamOperatorWrapper#sendClosedMail for " + wrapped);
+                () -> future.complete(null),
+                "StreamOperatorWrapper#sendFinishedMail for " + wrapped);
         return future;
     }
 
-    private void closeOperator(StreamTaskActionExecutor actionExecutor) throws 
Exception {
+    private void finishOperator(StreamTaskActionExecutor actionExecutor) 
throws Exception {
         actionExecutor.runThrowing(
                 () -> {
                     closed = true;
-                    wrapped.close();
+                    wrapped.finish();
                 });
     }
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index a0326981..8ff3446 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -236,7 +236,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>> extends Ab
      */
     private volatile boolean failing;
 
-    private boolean disposedOperators;
+    private boolean closedOperators;
 
     /** Thread pool for async snapshot workers. */
     private final ExecutorService asyncOperationsThreadPool;
@@ -506,7 +506,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>> extends Ab
 
     /**
      * Instructs the task to go through its normal termination routine, i.e. 
exit the run-loop and
-     * call {@link StreamOperator#close()} and {@link 
StreamOperator#dispose()} on its operators.
+     * call {@link StreamOperator#finish()} and {@link StreamOperator#close()} 
on its operators.
      *
      * <p>This is used by the source task to get out of the run-loop when the 
job is stopped with a
      * savepoint.
@@ -552,7 +552,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>> extends Ab
             LOG.debug("Re-restore attempt rejected.");
             return;
         }
-        disposedOperators = false;
+        closedOperators = false;
         LOG.debug("Initializing {}.", getName());
 
         operatorChain = new OperatorChain<>(this, recordWriter);
@@ -698,7 +698,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>> extends Ab
         final CompletableFuture<Void> timersFinishedFuture = new 
CompletableFuture<>();
 
         // close all operators in a chain effect way
-        operatorChain.closeOperators(actionExecutor);
+        operatorChain.finishOperators(actionExecutor);
 
         // If checkpoints are enabled, waits for all the records get processed 
by the downstream
         // tasks. During this process, this task could coordinate with its 
downstream tasks to
@@ -758,7 +758,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>> extends Ab
 
         // make an attempt to dispose the operators such that failures in the 
dispose call
         // still let the computation fail
-        disposeAllOperators();
+        closeAllOperators();
     }
 
     protected void cleanUpInvoke() throws Exception {
@@ -789,8 +789,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>> extends Ab
         suppressedException = runAndSuppressThrowable(this::cleanup, 
suppressedException);
 
         // if the operators were not disposed before, do a hard dispose
-        suppressedException =
-                runAndSuppressThrowable(this::disposeAllOperators, 
suppressedException);
+        suppressedException = runAndSuppressThrowable(this::closeAllOperators, 
suppressedException);
 
         // release the output resources. this method should never fail.
         suppressedException =
@@ -888,24 +887,24 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>> extends Ab
     }
 
     /**
-     * Execute @link StreamOperator#dispose()} of each operator in the chain 
of this {@link
-     * StreamTask}. Disposing happens from <b>tail to head</b> operator in the 
chain.
+     * Execute {@link StreamOperator#close()} of each operator in the chain of 
this {@link
+     * StreamTask}. Closing happens from <b>tail to head</b> operator in the 
chain.
      */
-    private void disposeAllOperators() throws Exception {
-        if (operatorChain != null && !disposedOperators) {
-            Exception disposalException = null;
+    private void closeAllOperators() throws Exception {
+        if (operatorChain != null && !closedOperators) {
+            Exception closingException = null;
             for (StreamOperatorWrapper<?, ?> operatorWrapper :
                     operatorChain.getAllOperators(true)) {
                 StreamOperator<?> operator = 
operatorWrapper.getStreamOperator();
                 try {
-                    operator.dispose();
+                    operator.close();
                 } catch (Exception e) {
-                    disposalException = firstOrSuppressed(e, 
disposalException);
+                    closingException = firstOrSuppressed(e, closingException);
                 }
             }
-            disposedOperators = true;
-            if (disposalException != null) {
-                throw disposalException;
+            closedOperators = true;
+            if (closingException != null) {
+                throw closingException;
             }
         }
     }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
index 8697353..6daaee8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -67,9 +67,9 @@ public class AbstractUdfStreamOperatorLifecycleTest {
                     "UDF::run",
                     "OPERATOR::prepareSnapshotPreBarrier",
                     "OPERATOR::snapshotState",
+                    "OPERATOR::finish",
                     "OPERATOR::close",
-                    "UDF::close",
-                    "OPERATOR::dispose");
+                    "UDF::close");
 
     private static final List<String> EXPECTED_CALL_ORDER_CANCEL_RUNNING =
             Arrays.asList(
@@ -82,13 +82,13 @@ public class AbstractUdfStreamOperatorLifecycleTest {
                     "UDF::run",
                     "OPERATOR::cancel",
                     "UDF::cancel",
-                    "OPERATOR::dispose",
+                    "OPERATOR::close",
                     "UDF::close");
 
     private static final String ALL_METHODS_STREAM_OPERATOR =
             "["
                     + "close[], "
-                    + "dispose[], "
+                    + "finish[], "
                     + "getCurrentKey[], "
                     + "getMetricGroup[], "
                     + "getOperatorID[], "
@@ -320,6 +320,12 @@ public class AbstractUdfStreamOperatorLifecycleTest {
         }
 
         @Override
+        public void finish() throws Exception {
+            ACTUAL_ORDER_TRACKING.add("OPERATOR::finish");
+            super.finish();
+        }
+
+        @Override
         public void close() throws Exception {
             ACTUAL_ORDER_TRACKING.add("OPERATOR::close");
             super.close();
@@ -330,14 +336,5 @@ public class AbstractUdfStreamOperatorLifecycleTest {
             ACTUAL_ORDER_TRACKING.add("OPERATOR::cancel");
             super.cancel();
         }
-
-        @Override
-        public void dispose() throws Exception {
-            ACTUAL_ORDER_TRACKING.add("OPERATOR::dispose");
-            super.dispose();
-            if (simulateCheckpointing) {
-                testCheckpointer.join();
-            }
-        }
     }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index 7c835f0..a518c33 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -95,7 +95,6 @@ public class SourceOperatorTest {
     @After
     public void cleanUp() throws Exception {
         operator.close();
-        operator.dispose();
         assertTrue(mockSourceReader.isClosed());
     }
 
@@ -187,17 +186,6 @@ public class SourceOperatorTest {
         assertEquals(100L, (long) 
mockSourceReader.getAbortedCheckpoints().get(0));
     }
 
-    @Test
-    public void testDisposeAfterCloseOnlyClosesReaderOnce() throws Exception {
-        // Initialize the operator.
-        operator.initializeState(getStateContext());
-        // Open the operator.
-        operator.open();
-        operator.close();
-        operator.dispose();
-        assertEquals(1, mockSourceReader.getTimesClosed());
-    }
-
     // ---------------- helper methods -------------------------
 
     private StateInitializationContext getStateContext() throws Exception {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java
index 64405d4..70a8bb7 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java
@@ -294,7 +294,7 @@ public class ContinuousFileProcessingRescalingTest {
             while (!getFormat().isLastProcessed()) {
                 mailboxProcessor.runMailboxStep();
             }
-            getHarness().close();
+            harness.getOperator().finish();
         }
 
         @Override
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
index 597d7fb..3e80b1a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
@@ -190,7 +190,7 @@ public class StreamSourceOperatorLatencyMetricsTest extends 
TestLogger {
                                 new MockEnvironmentBuilder().build()));
         try {
             operator.run(new Object(), new CollectorOutput<>(output), 
operatorChain);
-            operator.close();
+            operator.finish();
         } finally {
             operatorChain.close();
         }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
index 71be94f..5be8ae25 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
@@ -73,7 +73,6 @@ import 
org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest.WatermarkMetricOperator;
-import org.apache.flink.streaming.util.TestBoundedMultipleInputOperator;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.util.SerializedValue;
 
@@ -473,10 +472,11 @@ public class MultipleInputStreamTaskTest {
                         
LifeCycleTrackingMapToStringMultipleInputOperator.END_INPUT,
                         
LifeCycleTrackingMapToStringMultipleInputOperator.END_INPUT,
                         
LifeCycleTrackingMapToStringMultipleInputOperator.END_INPUT,
-                        LifeCycleTrackingMockSourceReader.CLOSE,
-                        
LifeCycleTrackingMapToStringMultipleInputOperator.CLOSE,
+                        
LifeCycleTrackingMapToStringMultipleInputOperator.FINISH,
                         LifeCycleTrackingMap.END_INPUT,
-                        LifeCycleTrackingMap.CLOSE));
+                        LifeCycleTrackingMap.CLOSE,
+                        
LifeCycleTrackingMapToStringMultipleInputOperator.CLOSE,
+                        LifeCycleTrackingMockSourceReader.CLOSE));
     }
 
     @Test
@@ -999,35 +999,6 @@ public class MultipleInputStreamTaskTest {
         }
     }
 
-    private static class TestBoundedMultipleInputOperatorFactory
-            extends AbstractStreamOperatorFactory<String> {
-        @Override
-        public <T extends StreamOperator<String>> T createStreamOperator(
-                StreamOperatorParameters<String> parameters) {
-            return (T) new TestBoundedMultipleInputOperator("Operator0", 
parameters);
-        }
-
-        @Override
-        public Class<? extends StreamOperator<String>> getStreamOperatorClass(
-                ClassLoader classLoader) {
-            return TestBoundedMultipleInputOperator.class;
-        }
-    }
-
-    private static class DuplicatingOperatorFactory extends 
AbstractStreamOperatorFactory<String> {
-        @Override
-        public <T extends StreamOperator<String>> T createStreamOperator(
-                StreamOperatorParameters<String> parameters) {
-            return (T) new DuplicatingOperator(parameters);
-        }
-
-        @Override
-        public Class<? extends StreamOperator<String>> getStreamOperatorClass(
-                ClassLoader classLoader) {
-            return DuplicatingOperator.class;
-        }
-    }
-
     /** Factory for {@link MapToStringMultipleInputOperator}. */
     protected static class MapToStringMultipleInputOperatorFactory
             extends AbstractStreamOperatorFactory<String> {
@@ -1138,6 +1109,7 @@ public class MultipleInputStreamTaskTest {
             extends MapToStringMultipleInputOperator implements 
BoundedMultiInput {
         public static final String OPEN = "MultipleInputOperator#open";
         public static final String CLOSE = "MultipleInputOperator#close";
+        public static final String FINISH = "MultipleInputOperator#finish";
         public static final String END_INPUT = 
"MultipleInputOperator#endInput";
 
         private static final long serialVersionUID = 1L;
@@ -1163,6 +1135,11 @@ public class MultipleInputStreamTaskTest {
         public void endInput(int inputId) {
             LIFE_CYCLE_EVENTS.add(END_INPUT);
         }
+
+        @Override
+        public void finish() throws Exception {
+            LIFE_CYCLE_EVENTS.add(FINISH);
+        }
     }
 
     static class LifeCycleTrackingMapToStringMultipleInputOperatorFactory
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index c845e1d..c5c653b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -714,9 +714,11 @@ public class OneInputStreamTaskTest extends TestLogger {
                 expected,
                 new StreamRecord<>("Hello"),
                 new StreamRecord<>("[Operator0]: End of input"),
-                new StreamRecord<>("[Operator0]: Bye"),
+                new StreamRecord<>("[Operator0]: Finish"),
                 new StreamRecord<>("[Operator1]: End of input"),
-                new StreamRecord<>("[Operator1]: Bye"));
+                new StreamRecord<>("[Operator1]: Finish"),
+                new StreamRecord<>("[Operator1]: Bye"),
+                new StreamRecord<>("[Operator0]: Bye"));
 
         final Object[] output = testHarness.getOutput().toArray();
         assertArrayEquals("Output was not correct.", expected.toArray(), 
output);
@@ -733,7 +735,7 @@ public class OneInputStreamTaskTest extends TestLogger {
         }
 
         @Override
-        public void close() throws Exception {
+        public void finish() throws Exception {
 
             // verify that the timer service is still running
             Assert.assertTrue(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 607d848..287f86f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -267,9 +267,11 @@ public class SourceStreamTaskTest extends 
SourceStreamTaskTestBase {
                 expected,
                 new StreamRecord<>("Hello"),
                 new StreamRecord<>("[Source0]: End of input"),
-                new StreamRecord<>("[Source0]: Bye"),
+                new StreamRecord<>("[Source0]: Finish"),
                 new StreamRecord<>("[Operator1]: End of input"),
-                new StreamRecord<>("[Operator1]: Bye"));
+                new StreamRecord<>("[Operator1]: Finish"),
+                new StreamRecord<>("[Operator1]: Bye"),
+                new StreamRecord<>("[Source0]: Bye"));
 
         final Object[] output = testHarness.getOutput().toArray();
         assertArrayEquals("Output was not correct.", expected.toArray(), 
output);
@@ -311,6 +313,7 @@ public class SourceStreamTaskTest extends 
SourceStreamTaskTestBase {
         }
 
         expectedOutput.add(new StreamRecord<>("Hello"));
+        expectedOutput.add(new StreamRecord<>("[Operator1]: Bye"));
 
         TestHarnessUtil.assertOutputEquals(
                 "Output was not correct.", expectedOutput, 
testHarness.getOutput());
@@ -919,12 +922,18 @@ public class SourceStreamTaskTest extends 
SourceStreamTaskTestBase {
         }
 
         @Override
-        public void close() throws Exception {
+        public void finish() throws Exception {
             ProcessingTimeService timeService = getProcessingTimeService();
             timeService.registerTimer(
                     timeService.getCurrentProcessingTime(),
                     t -> output("[" + name + "]: Timer registered in close"));
 
+            output("[" + name + "]: Finish");
+            super.finish();
+        }
+
+        @Override
+        public void close() throws Exception {
             output("[" + name + "]: Bye");
             super.close();
         }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java
index 640efe0..7725f3e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java
@@ -131,9 +131,9 @@ public class StreamOperatorWrapperTest extends TestLogger {
     }
 
     @Test
-    public void testClose() throws Exception {
+    public void testFinish() throws Exception {
         output.clear();
-        operatorWrappers.get(0).close(containingTask.getActionExecutor(), 
false);
+        operatorWrappers.get(0).finish(containingTask.getActionExecutor(), 
false);
 
         List<Object> expected = new ArrayList<>();
         for (int i = 0; i < operatorWrappers.size(); i++) {
@@ -143,7 +143,7 @@ public class StreamOperatorWrapperTest extends TestLogger {
                     prefix + ": End of input",
                     prefix + ": Timer that was in mailbox before closing 
operator",
                     prefix + ": Bye",
-                    prefix + ": Mail to put in mailbox when closing operator");
+                    prefix + ": Mail to put in mailbox when finishing 
operator");
         }
 
         assertArrayEquals(
@@ -153,12 +153,12 @@ public class StreamOperatorWrapperTest extends TestLogger 
{
     }
 
     @Test
-    public void testClosingOperatorWithException() {
-        AbstractStreamOperator streamOperator =
+    public void testFinishingOperatorWithException() {
+        AbstractStreamOperator<Void> streamOperator =
                 new AbstractStreamOperator<Void>() {
                     @Override
-                    public void close() throws Exception {
-                        throw new Exception("test exception at closing");
+                    public void finish() throws Exception {
+                        throw new Exception("test exception at finishing");
                     }
                 };
 
@@ -172,11 +172,11 @@ public class StreamOperatorWrapperTest extends TestLogger 
{
                         true);
 
         try {
-            operatorWrapper.close(containingTask.getActionExecutor(), false);
+            operatorWrapper.finish(containingTask.getActionExecutor(), false);
             fail("should throw an exception");
         } catch (Throwable t) {
             Optional<Throwable> optional =
-                    ExceptionUtils.findThrowableWithMessage(t, "test exception 
at closing");
+                    ExceptionUtils.findThrowableWithMessage(t, "test exception 
at finishing");
             assertTrue(optional.isPresent());
         }
     }
@@ -313,20 +313,22 @@ public class StreamOperatorWrapperTest extends TestLogger 
{
         }
 
         @Override
-        public void close() throws Exception {
+        public void finish() throws Exception {
             ProcessingTimeCallback callback =
                     t1 ->
                             output.add(
                                     "["
                                             + name
-                                            + "]: Timer to put in mailbox when 
closing operator");
+                                            + "]: Timer to put in mailbox when 
finishing operator");
             assertNotNull(processingTimeService.registerTimer(0, callback));
             assertNull(timerMailController.getPuttingLatch(callback));
 
             mailboxExecutor.submit(
                     () ->
                             output.add(
-                                    "[" + name + "]: Mail to put in mailbox 
when closing operator"),
+                                    "["
+                                            + name
+                                            + "]: Mail to put in mailbox when 
finishing operator"),
                     "");
 
             output.add("[" + name + "]: Bye");
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index f2123b7..78356a8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -371,7 +371,7 @@ public class StreamTaskTest extends TestLogger {
                 throw ex;
             }
             if (!(ex.getCause().getSuppressed()[0]
-                    instanceof FailingTwiceOperator.DisposeException)) {
+                    instanceof FailingTwiceOperator.CloseException)) {
                 throw ex;
             }
         }
@@ -387,13 +387,13 @@ public class StreamTaskTest extends TestLogger {
         }
 
         @Override
-        public void dispose() throws Exception {
-            throw new DisposeException();
+        public void close() throws Exception {
+            throw new CloseException();
         }
 
-        class DisposeException extends Exception {
-            public DisposeException() {
-                super("Dispose Exception. This exception should be 
suppressed");
+        static class CloseException extends Exception {
+            public CloseException() {
+                super("Close Exception. This exception should be suppressed");
             }
         }
     }
@@ -1143,10 +1143,10 @@ public class StreamTaskTest extends TestLogger {
      */
     @Test
     public void testOperatorClosingBeforeStopRunning() throws Throwable {
-        BlockingCloseStreamOperator.resetLatches();
+        BlockingFinishStreamOperator.resetLatches();
         Configuration taskConfiguration = new Configuration();
         StreamConfig streamConfig = new StreamConfig(taskConfiguration);
-        streamConfig.setStreamOperator(new BlockingCloseStreamOperator());
+        streamConfig.setStreamOperator(new BlockingFinishStreamOperator());
         streamConfig.setOperatorID(new OperatorID());
 
         try (MockEnvironment mockEnvironment =
@@ -1158,16 +1158,16 @@ public class StreamTaskTest extends TestLogger {
                         .setTaskConfiguration(taskConfiguration)
                         .build()) {
 
-            RunningTask<StreamTask<Void, BlockingCloseStreamOperator>> task =
+            RunningTask<StreamTask<Void, BlockingFinishStreamOperator>> task =
                     runTask(() -> new NoOpStreamTask<>(mockEnvironment));
 
-            BlockingCloseStreamOperator.inClose.await();
+            BlockingFinishStreamOperator.inClose.await();
 
             // check that the StreamTask is not yet in isRunning == false
             assertTrue(task.streamTask.isRunning());
 
             // let the operator finish its close operation
-            BlockingCloseStreamOperator.finishClose.trigger();
+            BlockingFinishStreamOperator.finishClose.trigger();
 
             task.waitForTaskCompletion(false);
 
@@ -1184,7 +1184,7 @@ public class StreamTaskTest extends TestLogger {
      */
     @Test
     public void testNotifyCheckpointOnClosedOperator() throws Throwable {
-        ClosingOperator operator = new ClosingOperator();
+        ClosingOperator<Integer> operator = new ClosingOperator<>();
         StreamTaskMailboxTestHarnessBuilder<Integer> builder =
                 new StreamTaskMailboxTestHarnessBuilder<>(
                                 OneInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
@@ -1198,15 +1198,15 @@ public class StreamTaskTest extends TestLogger {
 
         harness.streamTask.notifyCheckpointCompleteAsync(1);
         harness.streamTask.runMailboxStep();
-        assertEquals(1, operator.notified.get());
-        assertEquals(false, operator.closed.get());
+        assertEquals(1, ClosingOperator.notified.get());
+        assertFalse(ClosingOperator.closed.get());
 
         // close operators directly, so that task is still fully running
-        
harness.streamTask.operatorChain.closeOperators(harness.streamTask.getActionExecutor());
+        
harness.streamTask.operatorChain.finishOperators(harness.streamTask.getActionExecutor());
         harness.streamTask.notifyCheckpointCompleteAsync(2);
         harness.streamTask.runMailboxStep();
-        assertEquals(1, operator.notified.get());
-        assertEquals(true, operator.closed.get());
+        assertEquals(1, ClosingOperator.notified.get());
+        assertTrue(ClosingOperator.closed.get());
     }
 
     @Test
@@ -1247,7 +1247,7 @@ public class StreamTaskTest extends TestLogger {
      */
     @Test
     public void testCheckpointDeclinedOnClosedOperator() throws Throwable {
-        ClosingOperator operator = new ClosingOperator();
+        ClosingOperator<Integer> operator = new ClosingOperator<>();
         StreamTaskMailboxTestHarnessBuilder<Integer> builder =
                 new StreamTaskMailboxTestHarnessBuilder<>(
                                 OneInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
@@ -1258,8 +1258,8 @@ public class StreamTaskTest extends TestLogger {
         harness.setAutoProcess(false);
         harness.processElement(new StreamRecord<>(1));
 
-        
harness.streamTask.operatorChain.closeOperators(harness.streamTask.getActionExecutor());
-        assertEquals(true, operator.closed.get());
+        
harness.streamTask.operatorChain.finishOperators(harness.streamTask.getActionExecutor());
+        assertTrue(ClosingOperator.closed.get());
 
         harness.streamTask.triggerCheckpointOnBarrier(
                 new CheckpointMetaData(1, 0),
@@ -2057,14 +2057,14 @@ public class StreamTaskTest extends TestLogger {
         }
     }
 
-    private static class BlockingCloseStreamOperator extends 
AbstractStreamOperator<Void> {
+    private static class BlockingFinishStreamOperator extends 
AbstractStreamOperator<Void> {
         private static final long serialVersionUID = -9042150529568008847L;
 
         private static volatile OneShotLatch inClose;
         private static volatile OneShotLatch finishClose;
 
         @Override
-        public void close() throws Exception {
+        public void finish() throws Exception {
             checkLatches();
             inClose.trigger();
             finishClose.await();
@@ -2673,7 +2673,7 @@ public class StreamTaskTest extends TestLogger {
         }
 
         @Override
-        public void close() throws Exception {
+        public void finish() throws Exception {
             super.close();
             closed.set(true);
         }
@@ -2769,7 +2769,7 @@ public class StreamTaskTest extends TestLogger {
         }
 
         @Override
-        public void dispose() throws Exception {
+        public void close() throws Exception {
             wasClosed = true;
         }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
index de9c703..f023e10 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
@@ -547,10 +547,10 @@ public class SubtaskCheckpointCoordinatorTest {
         public void open() throws Exception {}
 
         @Override
-        public void close() throws Exception {}
+        public void finish() throws Exception {}
 
         @Override
-        public void dispose() {}
+        public void close() throws Exception {}
 
         @Override
         public void prepareSnapshotPreBarrier(long checkpointId) {}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedOneInputStreamOperator.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedOneInputStreamOperator.java
index b5f523d..05673c0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedOneInputStreamOperator.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedOneInputStreamOperator.java
@@ -53,12 +53,18 @@ public class TestBoundedOneInputStreamOperator extends 
AbstractStreamOperator<St
     }
 
     @Override
-    public void close() throws Exception {
+    public void finish() throws Exception {
         ProcessingTimeService timeService = getProcessingTimeService();
         timeService.registerTimer(
                 timeService.getCurrentProcessingTime(),
-                t -> output("[" + name + "]: Timer registered in close"));
+                t -> output("[" + name + "]: Timer registered in finish"));
+
+        output("[" + name + "]: Finish");
+        super.finish();
+    }
 
+    @Override
+    public void close() throws Exception {
         output("[" + name + "]: Bye");
         super.close();
     }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedTwoInputOperator.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedTwoInputOperator.java
index 5edd4fe..8dd49d3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedTwoInputOperator.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedTwoInputOperator.java
@@ -51,14 +51,19 @@ public class TestBoundedTwoInputOperator extends 
AbstractStreamOperator<String>
     }
 
     @Override
-    public void close() throws Exception {
+    public void finish() throws Exception {
         ProcessingTimeService timeService = getProcessingTimeService();
         timeService.registerTimer(
                 timeService.getCurrentProcessingTime(),
                 t -> output("[" + name + "]: Timer registered in close"));
 
-        output.collect(new StreamRecord<>("[" + name + "]: Bye"));
-        super.close();
+        output.collect(new StreamRecord<>("[" + name + "]: Finish"));
+        super.finish();
+    }
+
+    @Override
+    public void close() throws Exception {
+        output("[" + name + "]: Bye");
     }
 
     private void output(String record) {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index 6e1fb62..d1ca9fc 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -710,9 +710,11 @@ public class TwoInputStreamTaskTest {
                 new StreamRecord<>("[Operator0-1]: End of input"),
                 new StreamRecord<>("[Operator0-2]: Hello-2"),
                 new StreamRecord<>("[Operator0-2]: End of input"),
-                new StreamRecord<>("[Operator0]: Bye"),
+                new StreamRecord<>("[Operator0]: Finish"),
                 new StreamRecord<>("[Operator1]: End of input"),
-                new StreamRecord<>("[Operator1]: Bye"));
+                new StreamRecord<>("[Operator1]: Finish"),
+                new StreamRecord<>("[Operator1]: Bye"),
+                new StreamRecord<>("[Operator0]: Bye"));
 
         final Object[] output = testHarness.getOutput().toArray();
         assertArrayEquals("Output was not correct.", expected.toArray(), 
output);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 3663052..95ad25e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -686,14 +686,14 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
         operator.notifyCheckpointComplete(checkpointId);
     }
 
-    /** Calls close and dispose on the operator. */
+    /** Calls finish and close on the operator. */
     public void close() throws Exception {
-        operator.close();
-        operator.dispose();
         if (processingTimeService != null) {
             processingTimeService.shutdownService();
         }
         setupCalled = false;
+        operator.finish();
+        operator.close();
 
         if (internalEnvironment.isPresent()) {
             internalEnvironment.get().close();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestBoundedMultipleInputOperator.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestBoundedMultipleInputOperator.java
deleted file mode 100644
index 6db4adf..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestBoundedMultipleInputOperator.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.streaming.api.operators.AbstractInput;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
-import org.apache.flink.streaming.api.operators.BoundedMultiInput;
-import org.apache.flink.streaming.api.operators.Input;
-import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.util.Arrays;
-import java.util.List;
-
-/** A test operator class implementing {@link BoundedMultiInput}. */
-public class TestBoundedMultipleInputOperator extends 
AbstractStreamOperatorV2<String>
-        implements MultipleInputStreamOperator<String>, BoundedMultiInput {
-
-    private static final long serialVersionUID = 1L;
-
-    private final String name;
-
-    public TestBoundedMultipleInputOperator(
-            String name, StreamOperatorParameters<String> parameters) {
-        super(parameters, 3);
-        this.name = name;
-    }
-
-    @Override
-    public List<Input> getInputs() {
-        return Arrays.asList(
-                new TestInput(this, 1), new TestInput(this, 2), new 
TestInput(this, 3));
-    }
-
-    @Override
-    public void endInput(int inputId) {
-        output.collect(new StreamRecord<>("[" + name + "-" + inputId + "]: End 
of input"));
-    }
-
-    @Override
-    public void close() throws Exception {
-        output.collect(new StreamRecord<>("[" + name + "]: Bye"));
-        super.close();
-    }
-
-    class TestInput extends AbstractInput<String, String> {
-        public TestInput(AbstractStreamOperatorV2<String> owner, int inputId) {
-            super(owner, inputId);
-        }
-
-        @Override
-        public void processElement(StreamRecord<String> element) throws 
Exception {
-            output.collect(
-                    element.replace("[" + name + "-" + inputId + "]: " + 
element.getValue()));
-        }
-    }
-}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/AbstractStreamingWriter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/AbstractStreamingWriter.java
index 73f6f1f..2a96c62 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/AbstractStreamingWriter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/AbstractStreamingWriter.java
@@ -159,8 +159,8 @@ public abstract class AbstractStreamingWriter<IN, OUT> 
extends AbstractStreamOpe
     }
 
     @Override
-    public void dispose() throws Exception {
-        super.dispose();
+    public void close() throws Exception {
+        super.close();
         if (helper != null) {
             helper.close();
         }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/TableStreamOperator.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/TableStreamOperator.java
index b94da3d..ef67090 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/TableStreamOperator.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/TableStreamOperator.java
@@ -40,8 +40,6 @@ public abstract class TableStreamOperator<OUT> extends 
AbstractStreamOperator<OU
     /** We listen to this ourselves because we don't have an {@link 
InternalTimerService}. */
     protected long currentWatermark = Long.MIN_VALUE;
 
-    private volatile boolean closed = false;
-
     protected transient ContextImpl ctx;
 
     public TableStreamOperator() {
@@ -55,20 +53,6 @@ public abstract class TableStreamOperator<OUT> extends 
AbstractStreamOperator<OU
     }
 
     @Override
-    public void close() throws Exception {
-        super.close();
-        closed = true;
-    }
-
-    @Override
-    public void dispose() throws Exception {
-        if (!closed) {
-            close();
-        }
-        super.dispose();
-    }
-
-    @Override
     public void processWatermark(Watermark mark) throws Exception {
         super.processWatermark(mark);
         currentWatermark = mark.getTimestamp();
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
index 63f184a..fbdfe48 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
@@ -66,9 +66,6 @@ public class LocalSlicingWindowAggOperator extends 
AbstractStreamOperator<RowDat
     /** This is used for emitting elements with a given timestamp. */
     protected transient TimestampedCollector<RowData> collector;
 
-    /** Flag to prevent duplicate function.close() calls in close() and 
dispose(). */
-    private transient boolean functionsClosed = false;
-
     /** current watermark of this operator. */
     private transient long currentWatermark;
 
@@ -94,7 +91,6 @@ public class LocalSlicingWindowAggOperator extends 
AbstractStreamOperator<RowDat
     @Override
     public void open() throws Exception {
         super.open();
-        functionsClosed = false;
 
         collector = new TimestampedCollector<>(output);
         collector.eraseTimestamp();
@@ -142,22 +138,11 @@ public class LocalSlicingWindowAggOperator extends 
AbstractStreamOperator<RowDat
     public void close() throws Exception {
         super.close();
         collector = null;
-        functionsClosed = true;
         if (windowBuffer != null) {
             windowBuffer.close();
         }
     }
 
-    @Override
-    public void dispose() throws Exception {
-        super.dispose();
-        collector = null;
-        if (!functionsClosed) {
-            functionsClosed = true;
-            windowBuffer.close();
-        }
-    }
-
     /** Compute memory size from memory faction. */
     private long computeMemorySize() {
         final Environment environment = getContainingTask().getEnvironment();
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/bundle/AbstractMapBundleOperator.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/bundle/AbstractMapBundleOperator.java
index eba6e32..523594d 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/bundle/AbstractMapBundleOperator.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/bundle/AbstractMapBundleOperator.java
@@ -151,28 +151,30 @@ public abstract class AbstractMapBundleOperator<K, V, IN, 
OUT> extends AbstractS
     }
 
     @Override
+    public void finish() throws Exception {
+        finishBundle();
+        super.finish();
+    }
+
+    @Override
     public void close() throws Exception {
+        Exception exception = null;
+
         try {
-            finishBundle();
-        } finally {
-            Exception exception = null;
-
-            try {
-                super.close();
-                if (function != null) {
-                    FunctionUtils.closeFunction(function);
-                }
-            } catch (InterruptedException interrupted) {
-                exception = interrupted;
-
-                Thread.currentThread().interrupt();
-            } catch (Exception e) {
-                exception = e;
+            super.close();
+            if (function != null) {
+                FunctionUtils.closeFunction(function);
             }
+        } catch (InterruptedException interrupted) {
+            exception = interrupted;
 
-            if (exception != null) {
-                LOG.warn("Errors occurred while closing the BundleOperator.", 
exception);
-            }
+            Thread.currentThread().interrupt();
+        } catch (Exception e) {
+            exception = e;
+        }
+
+        if (exception != null) {
+            LOG.warn("Errors occurred while closing the BundleOperator.", 
exception);
         }
     }
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java
index a874922..3d3b560 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java
@@ -134,6 +134,7 @@ public class TemporalProcessTimeJoinOperator extends 
BaseTwoInputStreamOperatorW
     @Override
     public void close() throws Exception {
         FunctionUtils.closeFunction(joinCondition);
+        super.close();
     }
 
     /**
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
index bdd7cac..63ae148 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
@@ -220,6 +220,7 @@ public class TemporalRowTimeJoinOperator extends 
BaseTwoInputStreamOperatorWithS
         if (joinCondition != null) {
             joinCondition.close();
         }
+        super.close();
     }
 
     /**
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java
index d51064f..66a5de2 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java
@@ -92,9 +92,6 @@ public abstract class WindowJoinOperator extends 
TableStreamOperator<RowData>
     private final boolean[] filterNullKeys;
     private final ZoneId shiftTimeZone;
 
-    /** Flag to prevent duplicate function.close() calls in close() and 
dispose(). */
-    private transient boolean functionsClosed = false;
-
     private transient WindowTimerService<Long> windowTimerService;
 
     // ------------------------------------------------------------------------
@@ -136,7 +133,6 @@ public abstract class WindowJoinOperator extends 
TableStreamOperator<RowData>
     @Override
     public void open() throws Exception {
         super.open();
-        functionsClosed = false;
 
         this.collector = new TimestampedCollector<>(output);
         collector.eraseTimestamp();
@@ -197,25 +193,12 @@ public abstract class WindowJoinOperator extends 
TableStreamOperator<RowData>
     public void close() throws Exception {
         super.close();
         collector = null;
-        functionsClosed = true;
         if (joinCondition != null) {
             joinCondition.close();
         }
     }
 
     @Override
-    public void dispose() throws Exception {
-        super.dispose();
-        collector = null;
-        if (!functionsClosed) {
-            functionsClosed = true;
-            if (joinCondition != null) {
-                joinCondition.close();
-            }
-        }
-    }
-
-    @Override
     public void processElement1(StreamRecord<RowData> element) throws 
Exception {
         processElement(element, leftWindowEndIndex, 
leftLateRecordsDroppedRate, leftWindowState);
     }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java
index 32863f8..ba28e51 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java
@@ -130,29 +130,29 @@ public abstract class MultipleInputStreamOperatorBase 
extends AbstractStreamOper
     }
 
     /**
-     * Closes all sub-operators in a multiple input operator effect way. 
Closing happens from
+     * Finish all sub-operators in a multiple input operator effect way. 
Finishing happens from
      * <b>head to tail</b> sub-operator in a multiple input operator, contrary 
to {@link
      * StreamOperator#open()} which happens <b>tail to head</b>.
      */
     @Override
-    public void close() throws Exception {
-        super.close();
+    public void finish() throws Exception {
+        super.finish();
         for (TableOperatorWrapper<?> wrapper : topologicalOrderingOperators) {
-            wrapper.close();
+            StreamOperator<?> operator = wrapper.getStreamOperator();
+            operator.finish();
         }
     }
 
     /**
-     * Dispose all sub-operators in a multiple input operator effect way. 
Disposing happens from
+     * Closes all sub-operators in a multiple input operator effect way. 
Closing happens from
      * <b>head to tail</b> sub-operator in a multiple input operator, contrary 
to {@link
      * StreamOperator#open()} which happens <b>tail to head</b>.
      */
     @Override
-    public void dispose() throws Exception {
-        super.dispose();
+    public void close() throws Exception {
+        super.close();
         for (TableOperatorWrapper<?> wrapper : topologicalOrderingOperators) {
-            StreamOperator<?> operator = wrapper.getStreamOperator();
-            operator.dispose();
+            wrapper.close();
         }
     }
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperator.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperator.java
index 642c03f..133f6e5 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperator.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperator.java
@@ -134,8 +134,8 @@ public class StreamSortOperator extends 
TableStreamOperator<RowData>
     }
 
     @Override
-    public void close() throws Exception {
-        LOG.info("Closing StreamSortOperator");
+    public void finish() throws Exception {
+        LOG.info("Finishing StreamSortOperator");
 
         // BoundedOneInput can not coexistence with checkpoint, so we emit 
output in close.
         if (!inputBuffer.isEmpty()) {
@@ -153,6 +153,6 @@ public class StreamSortOperator extends 
TableStreamOperator<RowData>
                         }
                     });
         }
-        super.close();
+        super.finish();
     }
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/WindowOperator.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/WindowOperator.java
index 77344b7..c2b5b29 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/WindowOperator.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/WindowOperator.java
@@ -154,9 +154,6 @@ public abstract class WindowOperator<K, W extends Window> 
extends AbstractStream
     /** This is used for emitting elements with a given timestamp. */
     protected transient TimestampedCollector<RowData> collector;
 
-    /** Flag to prevent duplicate function.close() calls in close() and 
dispose(). */
-    private transient boolean functionsClosed = false;
-
     private transient InternalTimerService<W> internalTimerService;
 
     private transient InternalValueState<K, W, RowData> windowState;
@@ -242,8 +239,6 @@ public abstract class WindowOperator<K, W extends Window> 
extends AbstractStream
     public void open() throws Exception {
         super.open();
 
-        functionsClosed = false;
-
         collector = new TimestampedCollector<>(output);
         collector.eraseTimestamp();
 
@@ -318,26 +313,12 @@ public abstract class WindowOperator<K, W extends Window> 
extends AbstractStream
         super.close();
         collector = null;
         triggerContext = null;
-        functionsClosed = true;
         if (windowAggregator != null) {
             windowAggregator.close();
         }
     }
 
     @Override
-    public void dispose() throws Exception {
-        super.dispose();
-        collector = null;
-        triggerContext = null;
-        if (!functionsClosed) {
-            functionsClosed = true;
-            if (windowAggregator != null) {
-                windowAggregator.close();
-            }
-        }
-    }
-
-    @Override
     public void processElement(StreamRecord<RowData> record) throws Exception {
         RowData inputRow = record.getValue();
         long timestamp;
@@ -407,10 +388,6 @@ public abstract class WindowOperator<K, W extends Window> 
extends AbstractStream
 
     @Override
     public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
-        if (functionsClosed) {
-            return;
-        }
-
         setCurrentKey(timer.getKey());
 
         triggerContext.window = timer.getNamespace();
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java
index 516f09b..1e2e093 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java
@@ -110,9 +110,6 @@ public final class SlicingWindowOperator<K, W> extends 
TableStreamOperator<RowDa
     /** This is used for emitting elements with a given timestamp. */
     protected transient TimestampedCollector<RowData> collector;
 
-    /** Flag to prevent duplicate function.close() calls in close() and 
dispose(). */
-    private transient boolean functionsClosed = false;
-
     /** The service to register timers. */
     private transient InternalTimerService<W> internalTimerService;
 
@@ -135,7 +132,6 @@ public final class SlicingWindowOperator<K, W> extends 
TableStreamOperator<RowDa
     @Override
     public void open() throws Exception {
         super.open();
-        functionsClosed = false;
 
         lastTriggeredProcessingTime = Long.MIN_VALUE;
         collector = new TimestampedCollector<>(output);
@@ -178,21 +174,10 @@ public final class SlicingWindowOperator<K, W> extends 
TableStreamOperator<RowDa
     public void close() throws Exception {
         super.close();
         collector = null;
-        functionsClosed = true;
         windowProcessor.close();
     }
 
     @Override
-    public void dispose() throws Exception {
-        super.dispose();
-        collector = null;
-        if (!functionsClosed) {
-            functionsClosed = true;
-            windowProcessor.close();
-        }
-    }
-
-    @Override
     public void processElement(StreamRecord<RowData> element) throws Exception 
{
         RowData inputRow = element.getValue();
         RowData currentKey = (RowData) getCurrentKey();
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/ProcTimeMiniBatchAssignerOperator.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/ProcTimeMiniBatchAssignerOperator.java
index 69d9bf3..8ae817e 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/ProcTimeMiniBatchAssignerOperator.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/ProcTimeMiniBatchAssignerOperator.java
@@ -106,9 +106,4 @@ public class ProcTimeMiniBatchAssignerOperator extends 
AbstractStreamOperator<Ro
             output.emitWatermark(mark);
         }
     }
-
-    @Override
-    public void close() throws Exception {
-        super.close();
-    }
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/RowTimeMiniBatchAssginerOperator.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/RowTimeMiniBatchAssginerOperator.java
index 5fcbc17..079eec0 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/RowTimeMiniBatchAssginerOperator.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/RowTimeMiniBatchAssginerOperator.java
@@ -98,8 +98,8 @@ public class RowTimeMiniBatchAssginerOperator extends 
AbstractStreamOperator<Row
     }
 
     @Override
-    public void close() throws Exception {
-        super.close();
+    public void finish() throws Exception {
+        super.finish();
 
         // emit the buffered watermark
         advanceWatermark();
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java
index d8b3a07..c0e664d 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java
@@ -56,9 +56,6 @@ public class WatermarkAssignerOperator extends 
AbstractStreamOperator<RowData>
 
     private transient long lastRecordTime;
 
-    /** Flag to prevent duplicate function.close() calls in close() and 
dispose(). */
-    private transient boolean functionsClosed = false;
-
     private transient StreamStatus currentStatus = StreamStatus.ACTIVE;
 
     /**
@@ -183,20 +180,14 @@ public class WatermarkAssignerOperator extends 
AbstractStreamOperator<RowData>
     }
 
     @Override
-    public void close() throws Exception {
+    public void finish() throws Exception {
         // all records have been processed, emit a final watermark
         processWatermark(Watermark.MAX_WATERMARK);
-
-        functionsClosed = true;
-        FunctionUtils.closeFunction(watermarkGenerator);
     }
 
     @Override
-    public void dispose() throws Exception {
-        super.dispose();
-        if (!functionsClosed) {
-            functionsClosed = true;
-            FunctionUtils.closeFunction(watermarkGenerator);
-        }
+    public void close() throws Exception {
+        FunctionUtils.closeFunction(watermarkGenerator);
+        super.close();
     }
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/BatchMultipleInputStreamOperatorTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/BatchMultipleInputStreamOperatorTest.java
index ee24464..d9dde71 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/BatchMultipleInputStreamOperatorTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/BatchMultipleInputStreamOperatorTest.java
@@ -133,37 +133,6 @@ public class BatchMultipleInputStreamOperatorTest extends 
MultipleInputTestBase
     }
 
     @Test
-    public void testDispose() throws Exception {
-        TestingBatchMultipleInputStreamOperator op = 
createMultipleInputStreamOperator();
-        TestingTwoInputStreamOperator joinOp2 =
-                (TestingTwoInputStreamOperator) 
op.getTailWrapper().getStreamOperator();
-
-        TableOperatorWrapper<?> joinWrapper1 = 
op.getTailWrapper().getInputWrappers().get(0);
-        TestingTwoInputStreamOperator joinOp1 =
-                (TestingTwoInputStreamOperator) 
joinWrapper1.getStreamOperator();
-
-        TableOperatorWrapper<?> aggWrapper1 = 
joinWrapper1.getInputWrappers().get(0);
-        TestingOneInputStreamOperator aggOp1 =
-                (TestingOneInputStreamOperator) 
aggWrapper1.getStreamOperator();
-
-        TableOperatorWrapper<?> aggWrapper2 = 
joinWrapper1.getInputWrappers().get(1);
-        TestingOneInputStreamOperator aggOp2 =
-                (TestingOneInputStreamOperator) 
aggWrapper2.getStreamOperator();
-
-        assertFalse(aggOp1.isDisposed());
-        assertFalse(aggOp2.isDisposed());
-        assertFalse(aggOp1.isDisposed());
-        assertFalse(joinOp2.isDisposed());
-
-        op.dispose();
-
-        assertTrue(aggOp1.isDisposed());
-        assertTrue(aggOp2.isDisposed());
-        assertTrue(joinOp1.isDisposed());
-        assertTrue(joinOp2.isDisposed());
-    }
-
-    @Test
     public void testClose() throws Exception {
         TestingBatchMultipleInputStreamOperator op = 
createMultipleInputStreamOperator();
         TestingTwoInputStreamOperator joinOp2 =
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/TestingOneInputStreamOperator.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/TestingOneInputStreamOperator.java
index efec9a9..c570db9 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/TestingOneInputStreamOperator.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/TestingOneInputStreamOperator.java
@@ -41,7 +41,6 @@ public class TestingOneInputStreamOperator extends 
AbstractStreamOperator<RowDat
     private Watermark currentWatermark = null;
     private LatencyMarker currentLatencyMarker = null;
     private boolean isEnd = false;
-    private boolean isDisposed = false;
     private boolean isClosed = false;
     private final List<StreamRecord<RowData>> receivedElements = new 
ArrayList<>();
 
@@ -91,11 +90,6 @@ public class TestingOneInputStreamOperator extends 
AbstractStreamOperator<RowDat
     }
 
     @Override
-    public void dispose() throws Exception {
-        this.isDisposed = true;
-    }
-
-    @Override
     public void close() throws Exception {
         isClosed = true;
     }
@@ -120,10 +114,6 @@ public class TestingOneInputStreamOperator extends 
AbstractStreamOperator<RowDat
         return isEnd;
     }
 
-    public boolean isDisposed() {
-        return isDisposed;
-    }
-
     public boolean isClosed() {
         return isClosed;
     }
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/TestingTwoInputStreamOperator.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/TestingTwoInputStreamOperator.java
index 25a40c9..252fa9a 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/TestingTwoInputStreamOperator.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/TestingTwoInputStreamOperator.java
@@ -45,7 +45,6 @@ public class TestingTwoInputStreamOperator extends 
AbstractStreamOperator<RowDat
     private LatencyMarker currentLatencyMarker1 = null;
     private LatencyMarker currentLatencyMarker2 = null;
     private final List<Integer> endInputs = new ArrayList<>();
-    private boolean isDisposed = false;
     private boolean isClosed = false;
 
     public TestingTwoInputStreamOperator() {
@@ -118,11 +117,6 @@ public class TestingTwoInputStreamOperator extends 
AbstractStreamOperator<RowDat
     }
 
     @Override
-    public void dispose() throws Exception {
-        this.isDisposed = true;
-    }
-
-    @Override
     public void close() throws Exception {
         isClosed = true;
     }
@@ -159,10 +153,6 @@ public class TestingTwoInputStreamOperator extends 
AbstractStreamOperator<RowDat
         return endInputs;
     }
 
-    public boolean isDisposed() {
-        return isDisposed;
-    }
-
     public boolean isClosed() {
         return isClosed;
     }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index 1dc9413..84d71d2 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -768,8 +768,8 @@ public class TimestampITCase extends TestLogger {
         }
 
         @Override
-        public void close() throws Exception {
-            super.close();
+        public void finish() throws Exception {
+            super.finish();
             finalWatermarks[getRuntimeContext().getIndexOfThisSubtask()] = 
watermarks;
         }
     }

Reply via email to