This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1c9bf0368e9a233a2a013436628790c2c2b60bcb
Author: Roman Khachatryan <[email protected]>
AuthorDate: Mon May 18 20:29:05 2020 +0200

    [FLINK-17547][task] Use iterator for unconsumed buffers.
    Motivation: support spilled records
    Changes:
    1. change SpillingAdaptiveSpanningRecordDeserializer.getUnconsumedBuffer
    signature
    2. adapt channel state persistence to new types
    
    No changes in existing logic.
---
 .../org/apache/flink/util/CloseableIterator.java   | 77 +++++++++++++++++++++-
 .../main/java/org/apache/flink/util/IOUtils.java   |  7 ++
 .../channel/ChannelStateWriteRequest.java          | 33 +++++++---
 .../ChannelStateWriteRequestDispatcherImpl.java    |  6 +-
 .../ChannelStateWriteRequestExecutorImpl.java      | 20 ++++--
 .../checkpoint/channel/ChannelStateWriter.java     |  6 +-
 .../checkpoint/channel/ChannelStateWriterImpl.java | 17 +++--
 .../api/serialization/NonSpanningWrapper.java      | 22 +++++--
 .../api/serialization/RecordDeserializer.java      |  4 +-
 .../network/api/serialization/SpanningWrapper.java | 12 ++--
 ...SpillingAdaptiveSpanningRecordDeserializer.java | 15 +----
 .../partition/consumer/RemoteInputChannel.java     |  3 +-
 .../ChannelStateWriteRequestDispatcherTest.java    | 10 ++-
 .../ChannelStateWriteRequestExecutorImplTest.java  |  1 -
 .../channel/ChannelStateWriterImplTest.java        | 13 ++--
 .../channel/CheckpointInProgressRequestTest.java   |  7 +-
 .../checkpoint/channel/MockChannelStateWriter.java | 11 +++-
 .../channel/RecordingChannelStateWriter.java       | 12 +++-
 .../SpanningRecordSerializationTest.java           |  9 +--
 .../partition/consumer/SingleInputGateTest.java    | 14 +++-
 .../runtime/state/ChannelPersistenceITCase.java    |  3 +-
 .../runtime/io/CheckpointBarrierUnaligner.java     |  3 +-
 .../runtime/io/StreamTaskNetworkInput.java         | 11 ++--
 23 files changed, 235 insertions(+), 81 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java 
b/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java
index 09ea046..cc51324 100644
--- a/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java
+++ b/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java
@@ -20,10 +20,15 @@ package org.apache.flink.util;
 
 import javax.annotation.Nonnull;
 
+import java.util.ArrayDeque;
 import java.util.Collections;
+import java.util.Deque;
 import java.util.Iterator;
+import java.util.List;
 import java.util.function.Consumer;
 
+import static java.util.Arrays.asList;
+
 /**
  * This interface represents an {@link Iterator} that is also {@link 
AutoCloseable}. A typical use-case for this
  * interface are iterators that are based on native-resources such as files, 
network, or database connections. Clients
@@ -37,7 +42,42 @@ public interface CloseableIterator<T> extends Iterator<T>, 
AutoCloseable {
 
        @Nonnull
        static <T> CloseableIterator<T> adapterForIterator(@Nonnull Iterator<T> 
iterator) {
-               return new IteratorAdapter<>(iterator);
+               return adapterForIterator(iterator, () -> {});
+       }
+
+       static <T> CloseableIterator<T> adapterForIterator(@Nonnull Iterator<T> 
iterator, AutoCloseable close) {
+               return new IteratorAdapter<>(iterator, close);
+       }
+
+       static <T> CloseableIterator<T> fromList(List<T> list, Consumer<T> 
closeNotConsumed) {
+               return new CloseableIterator<T>(){
+                       private final Deque<T> stack = new ArrayDeque<>(list);
+
+                       @Override
+                       public boolean hasNext() {
+                               return !stack.isEmpty();
+                       }
+
+                       @Override
+                       public T next() {
+                               return stack.poll();
+                       }
+
+                       @Override
+                       public void close() throws Exception {
+                               Exception exception = null;
+                               for (T el : stack) {
+                                       try {
+                                               closeNotConsumed.accept(el);
+                                       } catch (Exception e) {
+                                               exception = 
ExceptionUtils.firstOrSuppressed(e, exception);
+                                       }
+                               }
+                               if (exception != null) {
+                                       throw exception;
+                               }
+                       }
+               };
        }
 
        @SuppressWarnings("unchecked")
@@ -45,6 +85,34 @@ public interface CloseableIterator<T> extends Iterator<T>, 
AutoCloseable {
                return (CloseableIterator<T>) EMPTY_INSTANCE;
        }
 
+       static <T> CloseableIterator<T> ofElements(Consumer<T> 
closeNotConsumed, T... elements) {
+               return fromList(asList(elements), closeNotConsumed);
+       }
+
+       static <E> CloseableIterator<E> ofElement(E element, Consumer<E> 
closeIfNotConsumed) {
+               return new CloseableIterator<E>(){
+                       private boolean hasNext = true;
+
+                       @Override
+                       public boolean hasNext() {
+                               return hasNext;
+                       }
+
+                       @Override
+                       public E next() {
+                               hasNext = false;
+                               return element;
+                       }
+
+                       @Override
+                       public void close() {
+                               if (hasNext) {
+                                       closeIfNotConsumed.accept(element);
+                               }
+                       }
+               };
+       }
+
        /**
         * Adapter from {@link Iterator} to {@link CloseableIterator}. Does 
nothing on {@link #close()}.
         *
@@ -54,9 +122,11 @@ public interface CloseableIterator<T> extends Iterator<T>, 
AutoCloseable {
 
                @Nonnull
                private final Iterator<E> delegate;
+               private final AutoCloseable close;
 
-               IteratorAdapter(@Nonnull Iterator<E> delegate) {
+               IteratorAdapter(@Nonnull Iterator<E> delegate, AutoCloseable 
close) {
                        this.delegate = delegate;
+                       this.close = close;
                }
 
                @Override
@@ -80,7 +150,8 @@ public interface CloseableIterator<T> extends Iterator<T>, 
AutoCloseable {
                }
 
                @Override
-               public void close() {
+               public void close() throws Exception {
+                       close.close();
                }
        }
 }
diff --git a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
index 1f9af18..0b8f210 100644
--- a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
@@ -216,6 +216,13 @@ public final class IOUtils {
        }
 
        /**
+        * @see #closeAll(Iterable)
+        */
+       public static void closeAll(AutoCloseable... closeables) throws 
Exception {
+               closeAll(asList(closeables));
+       }
+
+       /**
         * Closes all {@link AutoCloseable} objects in the parameter, 
suppressing exceptions. Exception will be emitted
         * after calling close() on every object.
         *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java
index bd6c7ba..0848698 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java
@@ -20,23 +20,24 @@ package org.apache.flink.runtime.checkpoint.channel;
 import 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelStateWriteResult;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.ThrowingConsumer;
 
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
 
 import static 
org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequestState.CANCELLED;
 import static 
org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequestState.COMPLETED;
 import static 
org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequestState.EXECUTING;
 import static 
org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequestState.FAILED;
 import static 
org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequestState.NEW;
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 interface ChannelStateWriteRequest {
        long getCheckpointId();
 
-       void cancel(Throwable cause);
+       void cancel(Throwable cause) throws Exception;
 
        static CheckpointInProgressRequest completeInput(long checkpointId) {
                return new CheckpointInProgressRequest("completeInput", 
checkpointId, ChannelStateCheckpointWriter::completeInput, false);
@@ -46,8 +47,24 @@ interface ChannelStateWriteRequest {
                return new CheckpointInProgressRequest("completeOutput", 
checkpointId, ChannelStateCheckpointWriter::completeOutput, false);
        }
 
-       static ChannelStateWriteRequest write(long checkpointId, 
InputChannelInfo info, Buffer... flinkBuffers) {
-               return new CheckpointInProgressRequest("writeInput", 
checkpointId, writer -> writer.writeInput(info, flinkBuffers), 
recycle(flinkBuffers), false);
+       static ChannelStateWriteRequest write(long checkpointId, 
InputChannelInfo info, CloseableIterator<Buffer> iterator) {
+               return new CheckpointInProgressRequest(
+                       "writeInput",
+                       checkpointId,
+                       writer -> {
+                               while (iterator.hasNext()) {
+                                       Buffer buffer = iterator.next();
+                                       try {
+                                               
checkArgument(buffer.isBuffer());
+                                       } catch (Exception e) {
+                                               buffer.recycleBuffer();
+                                               throw e;
+                                       }
+                                       writer.writeInput(info, buffer);
+                               }
+                       },
+                       throwable -> iterator.close(),
+                       false);
        }
 
        static ChannelStateWriteRequest write(long checkpointId, 
ResultSubpartitionInfo info, Buffer... flinkBuffers) {
@@ -62,7 +79,7 @@ interface ChannelStateWriteRequest {
                return new CheckpointInProgressRequest("abort", checkpointId, 
writer -> writer.fail(cause), true);
        }
 
-       static Consumer<Throwable> recycle(Buffer[] flinkBuffers) {
+       static ThrowingConsumer<Throwable, Exception> recycle(Buffer[] 
flinkBuffers) {
                return unused -> {
                        for (Buffer b : flinkBuffers) {
                                b.recycleBuffer();
@@ -112,7 +129,7 @@ enum CheckpointInProgressRequestState {
 
 final class CheckpointInProgressRequest implements ChannelStateWriteRequest {
        private final ThrowingConsumer<ChannelStateCheckpointWriter, Exception> 
action;
-       private final Consumer<Throwable> discardAction;
+       private final ThrowingConsumer<Throwable, Exception> discardAction;
        private final long checkpointId;
        private final String name;
        private final boolean ignoreMissingWriter;
@@ -123,7 +140,7 @@ final class CheckpointInProgressRequest implements 
ChannelStateWriteRequest {
                }, ignoreMissingWriter);
        }
 
-       CheckpointInProgressRequest(String name, long checkpointId, 
ThrowingConsumer<ChannelStateCheckpointWriter, Exception> action, 
Consumer<Throwable> discardAction, boolean ignoreMissingWriter) {
+       CheckpointInProgressRequest(String name, long checkpointId, 
ThrowingConsumer<ChannelStateCheckpointWriter, Exception> action, 
ThrowingConsumer<Throwable, Exception> discardAction, boolean 
ignoreMissingWriter) {
                this.checkpointId = checkpointId;
                this.action = checkNotNull(action);
                this.discardAction = checkNotNull(discardAction);
@@ -137,7 +154,7 @@ final class CheckpointInProgressRequest implements 
ChannelStateWriteRequest {
        }
 
        @Override
-       public void cancel(Throwable cause) {
+       public void cancel(Throwable cause) throws Exception {
                if (state.compareAndSet(NEW, CANCELLED) || 
state.compareAndSet(FAILED, CANCELLED)) {
                        discardAction.accept(cause);
                }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java
index 843663e..0a15b91 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java
@@ -51,7 +51,11 @@ final class ChannelStateWriteRequestDispatcherImpl 
implements ChannelStateWriteR
                try {
                        dispatchInternal(request);
                } catch (Exception e) {
-                       request.cancel(e);
+                       try {
+                               request.cancel(e);
+                       } catch (Exception ex) {
+                               e.addSuppressed(ex);
+                       }
                        throw e;
                }
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java
index cbcc3f7..e87a21c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java
@@ -32,6 +32,9 @@ import java.util.List;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.IOUtils.closeAll;
 
 /**
  * Executes {@link ChannelStateWriteRequest}s in a separate thread. Any 
exception occurred during execution causes this
@@ -67,8 +70,15 @@ class ChannelStateWriteRequestExecutorImpl implements 
ChannelStateWriteRequestEx
                } catch (Exception ex) {
                        thrown = ex;
                } finally {
-                       cleanupRequests();
-                       dispatcher.fail(thrown == null ? new 
CancellationException() : thrown);
+                       try {
+                               closeAll(
+                                       this::cleanupRequests,
+                                       () -> dispatcher.fail(thrown == null ? 
new CancellationException() : thrown)
+                               );
+                       } catch (Exception e) {
+                               //noinspection NonAtomicOperationOnVolatileField
+                               thrown = ExceptionUtils.firstOrSuppressed(e, 
thrown);
+                       }
                }
                LOG.debug("loop terminated");
        }
@@ -87,14 +97,12 @@ class ChannelStateWriteRequestExecutorImpl implements 
ChannelStateWriteRequestEx
                }
        }
 
-       private void cleanupRequests() {
+       private void cleanupRequests() throws Exception {
                Throwable cause = thrown == null ? new CancellationException() 
: thrown;
                List<ChannelStateWriteRequest> drained = new ArrayList<>();
                deque.drainTo(drained);
                LOG.info("discarding {} drained requests", drained.size());
-               for (ChannelStateWriteRequest request : drained) {
-                       request.cancel(cause);
-               }
+               closeAll(drained.stream().<AutoCloseable>map(request -> () -> 
request.cancel(cause)).collect(Collectors.toList()));
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
index e19b1e2..5dad559 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.state.InputChannelStateHandle;
 import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
+import org.apache.flink.util.CloseableIterator;
 
 import java.io.Closeable;
 import java.util.Collection;
@@ -99,11 +100,10 @@ public interface ChannelStateWriter extends Closeable {
         *                    It is intended to use for incremental snapshots.
         *                    If no data is passed it is ignored.
         * @param data zero or more <b>data</b> buffers ordered by their 
sequence numbers
-        * @throws IllegalArgumentException if one or more passed buffers 
{@link Buffer#isBuffer()  isn't a buffer}
         * @see 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter#SEQUENCE_NUMBER_RESTORED
         * @see 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter#SEQUENCE_NUMBER_UNKNOWN
         */
-       void addInputData(long checkpointId, InputChannelInfo info, int 
startSeqNum, Buffer... data) throws IllegalArgumentException;
+       void addInputData(long checkpointId, InputChannelInfo info, int 
startSeqNum, CloseableIterator<Buffer> data);
 
        /**
         * Add in-flight buffers from the {@link 
org.apache.flink.runtime.io.network.partition.ResultSubpartition 
ResultSubpartition}.
@@ -161,7 +161,7 @@ public interface ChannelStateWriter extends Closeable {
                }
 
                @Override
-               public void addInputData(long checkpointId, InputChannelInfo 
info, int startSeqNum, Buffer... data) {
+               public void addInputData(long checkpointId, InputChannelInfo 
info, int startSeqNum, CloseableIterator<Buffer> data) {
                }
 
                @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
index 412a9f5..b6fa588 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
@@ -103,10 +104,9 @@ public class ChannelStateWriterImpl implements 
ChannelStateWriter {
        }
 
        @Override
-       public void addInputData(long checkpointId, InputChannelInfo info, int 
startSeqNum, Buffer... data) {
-               LOG.debug("add input data, checkpoint id: {}, channel: {}, 
startSeqNum: {}, num buffers: {}",
-                       checkpointId, info, startSeqNum, data == null ? 0 : 
data.length);
-               enqueue(write(checkpointId, info, checkBufferType(data)), 
false);
+       public void addInputData(long checkpointId, InputChannelInfo info, int 
startSeqNum, CloseableIterator<Buffer> iterator) {
+               LOG.debug("add input data, checkpoint id: {}, channel: {}, 
startSeqNum: {}", checkpointId, info, startSeqNum);
+               enqueue(write(checkpointId, info, iterator), false);
        }
 
        @Override
@@ -168,8 +168,13 @@ public class ChannelStateWriterImpl implements 
ChannelStateWriter {
                                executor.submit(request);
                        }
                } catch (Exception e) {
-                       request.cancel(e);
-                       throw new RuntimeException("unable to send request to 
worker", e);
+                       RuntimeException wrapped = new RuntimeException("unable 
to send request to worker", e);
+                       try {
+                               request.cancel(e);
+                       } catch (Exception cancelException) {
+                               wrapped.addSuppressed(cancelException);
+                       }
+                       throw wrapped;
                }
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
index 5de5467..343c6f4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
@@ -22,17 +22,21 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.NextRecordResponse;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.util.CloseableIterator;
 
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.UTFDataFormatException;
 import java.nio.ByteBuffer;
-import java.util.Optional;
 
 import static 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
 import static 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER;
 import static 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.PARTIAL_RECORD;
 import static 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES;
+import static 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
 
 final class NonSpanningWrapper implements DataInputView {
 
@@ -69,13 +73,13 @@ final class NonSpanningWrapper implements DataInputView {
                this.limit = limit;
        }
 
-       Optional<MemorySegment> getUnconsumedSegment() {
+       CloseableIterator<Buffer> getUnconsumedSegment() {
                if (!hasRemaining()) {
-                       return Optional.empty();
+                       return CloseableIterator.empty();
                }
-               MemorySegment target = 
MemorySegmentFactory.allocateUnpooledSegment(remaining());
-               segment.copyTo(position, target, 0, remaining());
-               return Optional.of(target);
+               MemorySegment segment = 
MemorySegmentFactory.allocateUnpooledSegment(remaining());
+               this.segment.copyTo(position, segment, 0, remaining());
+               return singleBufferIterator(segment);
        }
 
        boolean hasRemaining() {
@@ -359,4 +363,10 @@ final class NonSpanningWrapper implements DataInputView {
                return recordLength <= remaining();
        }
 
+       static CloseableIterator<Buffer> singleBufferIterator(MemorySegment 
target) {
+               return CloseableIterator.ofElement(
+                       new NetworkBuffer(target, 
FreeingBufferRecycler.INSTANCE, DATA_BUFFER, target.size()),
+                       Buffer::recycleBuffer);
+       }
+
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
index 4f4d621..07ff5ff 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
@@ -20,9 +20,9 @@ package org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.util.CloseableIterator;
 
 import java.io.IOException;
-import java.util.Optional;
 
 /**
  * Interface for turning sequences of memory segments into records.
@@ -71,5 +71,5 @@ public interface RecordDeserializer<T extends 
IOReadableWritable> {
         * <p>Note that the unconsumed buffer might be null if the whole buffer 
was already consumed
         * before and there are no partial length or data remained in the end 
of buffer.
         */
-       Optional<Buffer> getUnconsumedBuffer() throws IOException;
+       CloseableIterator<Buffer> getUnconsumedBuffer() throws IOException;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
index 430f0db..18ea6cc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
@@ -23,6 +23,8 @@ import 
org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.StringUtils;
 
 import java.io.BufferedInputStream;
@@ -34,11 +36,11 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.channels.FileChannel;
 import java.util.Arrays;
-import java.util.Optional;
 import java.util.Random;
 
 import static java.lang.Math.max;
 import static java.lang.Math.min;
+import static 
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.singleBufferIterator;
 import static 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES;
 import static org.apache.flink.util.FileUtils.writeCompletely;
 import static org.apache.flink.util.IOUtils.closeAllQuietly;
@@ -165,15 +167,15 @@ final class SpanningWrapper {
                }
        }
 
-       Optional<MemorySegment> getUnconsumedSegment() throws IOException {
+       CloseableIterator<Buffer> getUnconsumedSegment() throws IOException {
                if (isReadingLength()) {
-                       return Optional.of(copyLengthBuffer());
+                       return singleBufferIterator(copyLengthBuffer());
                } else if (isAboveSpillingThreshold()) {
                        throw new UnsupportedOperationException("Unaligned 
checkpoint currently do not support spilled records.");
                } else if (recordLength == -1) {
-                       return Optional.empty(); // no remaining partial length 
or data
+                       return CloseableIterator.empty(); // no remaining 
partial length or data
                } else {
-                       return Optional.of(copyDataBuffer());
+                       return singleBufferIterator(copyDataBuffer());
                }
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index 75e6b0b..2d4c24c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -21,18 +21,15 @@ package 
org.apache.flink.runtime.io.network.api.serialization;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
-import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.util.CloseableIterator;
 
 import javax.annotation.concurrent.NotThreadSafe;
 
 import java.io.IOException;
-import java.util.Optional;
 
 import static 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
 import static 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER;
 import static 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.PARTIAL_RECORD;
-import static 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
 
 /**
  * @param <T> The type of the record to be deserialized.
@@ -76,14 +73,8 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
        }
 
        @Override
-       public Optional<Buffer> getUnconsumedBuffer() throws IOException {
-               final Optional<MemorySegment> unconsumedSegment;
-               if (nonSpanningWrapper.hasRemaining()) {
-                       unconsumedSegment = 
nonSpanningWrapper.getUnconsumedSegment();
-               } else {
-                       unconsumedSegment = 
spanningWrapper.getUnconsumedSegment();
-               }
-               return unconsumedSegment.map(segment -> new 
NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE, DATA_BUFFER, 
segment.size()));
+       public CloseableIterator<Buffer> getUnconsumedBuffer() throws 
IOException {
+               return nonSpanningWrapper.hasRemaining() ? 
nonSpanningWrapper.getUnconsumedSegment() : 
spanningWrapper.getUnconsumedSegment();
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 6db81e9..4e1f260 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener;
 import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.util.CloseableIterator;
 
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
@@ -207,7 +208,7 @@ public class RemoteInputChannel extends InputChannel {
                                checkpointId,
                                channelInfo,
                                ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
-                               inflightBuffers.toArray(new Buffer[0]));
+                               CloseableIterator.fromList(inflightBuffers, 
Buffer::recycleBuffer));
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java
index f953c22..00c8ca7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java
@@ -17,8 +17,13 @@
 
 package org.apache.flink.runtime.checkpoint.channel;
 
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelStateWriteResult;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.util.CloseableIterator;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -83,7 +88,10 @@ public class ChannelStateWriteRequestDispatcherTest {
        }
 
        private static ChannelStateWriteRequest writeIn() {
-               return write(CHECKPOINT_ID, new InputChannelInfo(1, 1));
+               return write(CHECKPOINT_ID, new InputChannelInfo(1, 1), 
CloseableIterator.ofElement(
+                       new 
NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(1), 
FreeingBufferRecycler.INSTANCE),
+                       Buffer::recycleBuffer
+               ));
        }
 
        private static ChannelStateWriteRequest writeOut() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest.java
index 5aad9c6..a299b34 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest.java
@@ -30,7 +30,6 @@ import java.util.concurrent.LinkedBlockingDeque;
 
 import static 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcher.NO_OP;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
index 44552e6..92a7e88 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Consumer;
 
 import static 
org.apache.flink.runtime.state.ChannelPersistenceITCase.getStreamFactoryFactory;
+import static org.apache.flink.util.CloseableIterator.ofElements;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertSame;
@@ -47,14 +48,16 @@ public class ChannelStateWriterImplTest {
        private static final long CHECKPOINT_ID = 42L;
 
        @Test(expected = IllegalArgumentException.class)
-       public void testAddEventBuffer() {
+       public void testAddEventBuffer() throws Exception {
+
                NetworkBuffer dataBuf = getBuffer();
                NetworkBuffer eventBuf = getBuffer();
                eventBuf.setDataType(Buffer.DataType.EVENT_BUFFER);
-               ChannelStateWriterImpl writer = openWriter();
-               callStart(writer);
                try {
-                       writer.addInputData(CHECKPOINT_ID, new 
InputChannelInfo(1, 1), 1, eventBuf, dataBuf);
+                       runWithSyncWorker(writer -> {
+                               callStart(writer);
+                               writer.addInputData(CHECKPOINT_ID, new 
InputChannelInfo(1, 1), 1, ofElements(Buffer::recycleBuffer, eventBuf, 
dataBuf));
+                       });
                } finally {
                        assertTrue(dataBuf.isRecycled());
                }
@@ -285,7 +288,7 @@ public class ChannelStateWriterImplTest {
        }
 
        private void callAddInputData(ChannelStateWriter writer, 
NetworkBuffer... buffer) {
-               writer.addInputData(CHECKPOINT_ID, new InputChannelInfo(1, 1), 
1, buffer);
+               writer.addInputData(CHECKPOINT_ID, new InputChannelInfo(1, 1), 
1, ofElements(Buffer::recycleBuffer, buffer));
        }
 
        private void callAbort(ChannelStateWriter writer) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/CheckpointInProgressRequestTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/CheckpointInProgressRequestTest.java
index 3617b8f..556bf49 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/CheckpointInProgressRequestTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/CheckpointInProgressRequestTest.java
@@ -23,6 +23,7 @@ import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 /**
  * {@link CheckpointInProgressRequest} test.
@@ -41,7 +42,11 @@ public class CheckpointInProgressRequestTest {
                Thread[] threads = new Thread[barrier.getParties()];
                for (int i = 0; i < barrier.getParties(); i++) {
                        threads[i] = new Thread(() -> {
-                               request.cancel(new RuntimeException("test"));
+                               try {
+                                       request.cancel(new 
RuntimeException("test"));
+                               } catch (Exception e) {
+                                       fail(e.getMessage());
+                               }
                                await(barrier);
                        });
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java
index 5dcc00c..0a61066d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java
@@ -19,6 +19,9 @@ package org.apache.flink.runtime.checkpoint.channel;
 
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.util.CloseableIterator;
+
+import static org.apache.flink.util.ExceptionUtils.rethrow;
 
 /**
  * A no op implementation that performs basic checks of the contract, but does 
not actually write any data.
@@ -49,10 +52,12 @@ public class MockChannelStateWriter implements 
ChannelStateWriter {
        }
 
        @Override
-       public void addInputData(long checkpointId, InputChannelInfo info, int 
startSeqNum, Buffer... data) {
+       public void addInputData(long checkpointId, InputChannelInfo info, int 
startSeqNum, CloseableIterator<Buffer> iterator) {
                checkCheckpointId(checkpointId);
-               for (final Buffer buffer : data) {
-                       buffer.recycleBuffer();
+               try {
+                       iterator.close();
+               } catch (Exception e) {
+                       rethrow(e);
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/RecordingChannelStateWriter.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/RecordingChannelStateWriter.java
index b53e37b..d0cfe3f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/RecordingChannelStateWriter.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/RecordingChannelStateWriter.java
@@ -19,12 +19,15 @@ package org.apache.flink.runtime.checkpoint.channel;
 
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.util.CloseableIterator;
 
 import 
org.apache.flink.shaded.guava18.com.google.common.collect.LinkedListMultimap;
 import org.apache.flink.shaded.guava18.com.google.common.collect.ListMultimap;
 
 import java.util.Arrays;
 
+import static org.apache.flink.util.ExceptionUtils.rethrow;
+
 /**
  * A simple {@link ChannelStateWriter} used to write unit tests.
  */
@@ -54,9 +57,14 @@ public class RecordingChannelStateWriter extends 
MockChannelStateWriter {
        }
 
        @Override
-       public void addInputData(long checkpointId, InputChannelInfo info, int 
startSeqNum, Buffer... data) {
+       public void addInputData(long checkpointId, InputChannelInfo info, int 
startSeqNum, CloseableIterator<Buffer> iterator) {
                checkCheckpointId(checkpointId);
-               addedInput.putAll(info, Arrays.asList(data));
+               iterator.forEachRemaining(b -> addedInput.put(info, b));
+               try {
+                       iterator.close();
+               } catch (Exception e) {
+                       rethrow(e);
+               }
        }
 
        @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
index 183df10..e35b311 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.testutils.serialization.types.IntType;
 import org.apache.flink.testutils.serialization.types.SerializationTestType;
 import 
org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
 import org.apache.flink.testutils.serialization.types.Util;
+import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
@@ -46,7 +47,6 @@ import java.nio.channels.WritableByteChannel;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
 import java.util.Random;
 
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer;
@@ -293,14 +293,15 @@ public class SpanningRecordSerializationTest extends 
TestLogger {
                }
        }
 
-       private static void assertUnconsumedBuffer(ByteArrayOutputStream 
expected, Optional<Buffer> actual) {
-               if (!actual.isPresent()) {
+       private static void assertUnconsumedBuffer(ByteArrayOutputStream 
expected, CloseableIterator<Buffer> actual) throws Exception {
+               if (!actual.hasNext()) {
                        Assert.assertEquals(expected.size(), 0);
                }
 
                ByteBuffer expectedByteBuffer = 
ByteBuffer.wrap(expected.toByteArray());
-               ByteBuffer actualByteBuffer = 
actual.get().getNioBufferReadable();
+               ByteBuffer actualByteBuffer = 
actual.next().getNioBufferReadable();
                Assert.assertEquals(expectedByteBuffer, actualByteBuffer);
+               actual.close();
        }
 
        private static void writeBuffer(ByteBuffer buffer, OutputStream stream) 
throws IOException {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index abcf563..6f49b44 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -66,6 +66,7 @@ import 
org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.junit.Test;
@@ -74,7 +75,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -94,6 +94,7 @@ import static 
org.apache.flink.runtime.io.network.partition.InputGateFairnessTes
 import static 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannelTest.submitTasksAndWaitForResults;
 import static 
org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer;
 import static 
org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation;
+import static org.apache.flink.util.ExceptionUtils.rethrow;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
@@ -989,8 +990,15 @@ public class SingleInputGateTest extends InputGateTestBase 
{
 
                inputChannel.spillInflightBuffers(0, new 
ChannelStateWriterImpl.NoOpChannelStateWriter() {
                        @Override
-                       public void addInputData(long checkpointId, 
InputChannelInfo info, int startSeqNum, Buffer... data) {
-                               inflightBuffers.addAll(Arrays.asList(data));
+                       public void addInputData(long checkpointId, 
InputChannelInfo info, int startSeqNum, CloseableIterator<Buffer> iterator) {
+                               List<Buffer> list = new ArrayList<>();
+                               iterator.forEachRemaining(list::add);
+                               inflightBuffers.addAll(list);
+                               try {
+                                       iterator.close();
+                               } catch (Exception e) {
+                                       rethrow(e);
+                               }
                        }
                });
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
index 3f5e2cc..a77dbbf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
@@ -50,6 +50,7 @@ import static java.util.Collections.singletonMap;
 import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
 import static 
org.apache.flink.runtime.checkpoint.channel.ChannelStateReader.ReadResult.NO_MORE_DATA;
 import static 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN;
+import static org.apache.flink.util.CloseableIterator.ofElements;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.junit.Assert.assertArrayEquals;
 
@@ -102,7 +103,7 @@ public class ChannelPersistenceITCase {
                        writer.open();
                        writer.start(checkpointId, new 
CheckpointOptions(CHECKPOINT, new 
CheckpointStorageLocationReference("poly".getBytes())));
                        for (Map.Entry<InputChannelInfo, Buffer> e : 
icBuffers.entrySet()) {
-                               writer.addInputData(checkpointId, e.getKey(), 
SEQUENCE_NUMBER_UNKNOWN, e.getValue());
+                               writer.addInputData(checkpointId, e.getKey(), 
SEQUENCE_NUMBER_UNKNOWN, ofElements(Buffer::recycleBuffer, e.getValue()));
                        }
                        writer.finishInput(checkpointId);
                        for (Map.Entry<ResultSubpartitionInfo, Buffer> e : 
rsBuffers.entrySet()) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
index f98e83f..d39accf 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
@@ -45,6 +45,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import java.util.stream.IntStream;
 
+import static org.apache.flink.util.CloseableIterator.ofElement;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -330,7 +331,7 @@ public class CheckpointBarrierUnaligner extends 
CheckpointBarrierHandler {
                                        currentReceivedCheckpointId,
                                        channelInfo,
                                        
ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
-                                       buffer);
+                                       ofElement(buffer, 
Buffer::recycleBuffer));
                        } else {
                                buffer.recycleBuffer();
                        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
index 07826c7..6723a2d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
@@ -212,12 +212,11 @@ public final class StreamTaskNetworkInput<T> implements 
StreamTaskInput<T> {
                        // Assumption for retrieving buffers = one concurrent 
checkpoint
                        RecordDeserializer<?> deserializer = 
recordDeserializers[channelIndex];
                        if (deserializer != null) {
-                               
deserializer.getUnconsumedBuffer().ifPresent(buffer ->
-                                       channelStateWriter.addInputData(
-                                               checkpointId,
-                                               channel.getChannelInfo(),
-                                               
ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
-                                               buffer));
+                               channelStateWriter.addInputData(
+                                       checkpointId,
+                                       channel.getChannelInfo(),
+                                       
ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+                                       deserializer.getUnconsumedBuffer());
                        }
 
                        
checkpointedInputGate.spillInflightBuffers(checkpointId, channelIndex, 
channelStateWriter);

Reply via email to