AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r493025464



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
##########
@@ -361,4 +360,89 @@ public String toString() {
                                '}';
                }
        }
+
+       /**
+        * Helper class for persisting channel state via {@link 
ChannelStateWriter}.
+        */
+       @NotThreadSafe
+       protected final class ChannelStatePersister {
+               private static final long CHECKPOINT_COMPLETED = -1;
+
+               private static final long BARRIER_RECEIVED = -2;
+
+               /** All started checkpoints where a barrier has not been 
received yet. */
+               private long pendingCheckpointBarrierId = CHECKPOINT_COMPLETED;
+
+               /** Writer must be initialized before usage. {@link 
#startPersisting(long, List)} enforces this invariant. */
+               @Nullable
+               private final ChannelStateWriter channelStateWriter;
+
+               public ChannelStatePersister(@Nullable ChannelStateWriter 
channelStateWriter) {
+                       this.channelStateWriter = channelStateWriter;
+               }
+
+               protected void startPersisting(long barrierId, List<Buffer> 
knownBuffers) {
+                       checkState(isInitialized(), "Channel state writer not 
injected");
+
+                       if (pendingCheckpointBarrierId != BARRIER_RECEIVED) {
+                               pendingCheckpointBarrierId = barrierId;
+                       }
+                       if (knownBuffers.size() > 0) {
+                               channelStateWriter.addInputData(
+                                       barrierId,
+                                       channelInfo,
+                                       
ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+                                       
CloseableIterator.fromList(knownBuffers, Buffer::recycleBuffer));
+                       }
+               }
+
+               protected boolean isInitialized() {
+                       return channelStateWriter != null;
+               }
+
+               protected void stopPersisting() {
+                       pendingCheckpointBarrierId = CHECKPOINT_COMPLETED;
+               }
+
+               protected void maybePersist(Buffer buffer) {
+                       if (pendingCheckpointBarrierId >= 0 && 
buffer.isBuffer()) {
+                               channelStateWriter.addInputData(
+                                       pendingCheckpointBarrierId,
+                                       getChannelInfo(),
+                                       
ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+                                       
CloseableIterator.ofElement(buffer.retainBuffer(), Buffer::recycleBuffer));
+                       }
+               }
+
+               protected boolean checkForBarrier(Buffer buffer) throws 
IOException {
+                       final AbstractEvent priorityEvent = 
parsePriorityEvent(buffer);
+                       if (priorityEvent instanceof CheckpointBarrier) {
+                               pendingCheckpointBarrierId = BARRIER_RECEIVED;
+                               return true;
+                       }
+                       return false;
+               }
+
+               /**
+                * Parses the buffer as an event and returns the {@link 
CheckpointBarrier} if the event is indeed a barrier or
+                * returns null in all other cases.
+                */
+               @Nullable
+               protected AbstractEvent parsePriorityEvent(Buffer buffer) 
throws IOException {
+                       if (buffer.isBuffer() || 
!buffer.getDataType().hasPriority()) {
+                               return null;
+                       }
+
+                       AbstractEvent event = 
EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
+                       // reset the buffer because it would be deserialized 
again in SingleInputGate while getting next buffer.
+                       // we can further improve to avoid double 
deserialization in the future.
+                       buffer.setReaderIndex(0);
+                       return event;
+               }

Review comment:
       It's only used in this class and thus I inlined it. As is, it really 
looks like an interface of the Persister.
   
   There is similar code for `BufferConsumer` on output side, but the 
implementation is too different to align (copy buffer vs. reader index reset, 
different `EventSerializer.fromBuffer` overloads).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to