rkhachatryan commented on a change in pull request #12292:
URL: https://github.com/apache/flink/pull/12292#discussion_r430065985



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
##########
@@ -25,8 +25,10 @@
 import org.apache.flink.runtime.state.InputChannelStateHandle;

Review comment:
       > some more elaborate explanation what was the underlying problem
   
   I've updated PR description since then, does it look good to you now?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
##########
@@ -149,8 +152,8 @@ public void testRecordingOffsets() throws Exception {
        }
 
        private void write(ChannelStateCheckpointWriter writer, 
InputChannelInfo channelInfo, byte[] data) throws Exception {
-               NetworkBuffer buffer = new 
NetworkBuffer(HeapMemorySegment.FACTORY.allocateUnpooledSegment(data.length, 
null), FreeingBufferRecycler.INSTANCE);
-               buffer.setBytes(0, data);
+               MemorySegment segment = wrap(data);
+               NetworkBuffer buffer = new NetworkBuffer(segment, 
FreeingBufferRecycler.INSTANCE, Buffer.DataType.DATA_BUFFER, segment.size());

Review comment:
       Writer position of `NetworkBuffer` wasn't updated.This change updates it 
by passing `segment.size()` to constructor.
   
   Updated commit message.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
##########
@@ -180,17 +177,33 @@ private void doComplete(boolean precondition, 
RunnableWithException complete, Ru
        }
 
        private <I, H extends AbstractChannelStateHandle<I>> void complete(
+                       StreamStateHandle underlying,
                        CompletableFuture<Collection<H>> future,
                        Map<I, List<Long>> offsets,
-                       BiFunction<I, List<Long>, H> buildHandle) {
+                       TriFunction<I, StreamStateHandle, List<Long>, H> 
buildHandle) throws IOException {

Review comment:
       Totally agree.
   Extracted `HandleFactory` interface and replaced `Map.Entry` with two 
arguments.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
##########
@@ -180,17 +177,33 @@ private void doComplete(boolean precondition, 
RunnableWithException complete, Ru
        }
 
        private <I, H extends AbstractChannelStateHandle<I>> void complete(
+                       StreamStateHandle underlying,
                        CompletableFuture<Collection<H>> future,
                        Map<I, List<Long>> offsets,
-                       BiFunction<I, List<Long>, H> buildHandle) {
+                       TriFunction<I, StreamStateHandle, List<Long>, H> 
buildHandle) throws IOException {
                final Collection<H> handles = new ArrayList<>();
                for (Map.Entry<I, List<Long>> e : offsets.entrySet()) {
-                       handles.add(buildHandle.apply(e.getKey(), 
e.getValue()));
+                       handles.add(createHandle(underlying, buildHandle, e));
                }
                future.complete(handles);
                LOG.debug("channel state write completed, checkpointId: {}, 
handles: {}", checkpointId, handles);
        }
 
+       private <I, H extends AbstractChannelStateHandle<I>> H createHandle(
+                       StreamStateHandle underlying,
+                       TriFunction<I, StreamStateHandle, List<Long>, H> 
buildHandle,
+                       Map.Entry<I, List<Long>> e) throws IOException {
+               if (underlying instanceof ByteStreamStateHandle) {
+                       ByteStreamStateHandle byteHandle = 
(ByteStreamStateHandle) underlying;
+                       return buildHandle.apply(
+                               e.getKey(),
+                               new 
ByteStreamStateHandle(randomUUID().toString(), 
serializer.extractAndMerge(byteHandle.getData(), e.getValue())),
+                               singletonList(serializer.getHeaderLength()));
+               } else {

Review comment:
       Added `StreamStateHandle.asBytesIfInMemory` that returns 
`Optional<byte[]>` (better names are welcomed ;)
   
   > Why is this issue new for spilled channel state? What's different for 
operators state?
   
   Operators don't share state handles with each other so they don't have this 
problem.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
##########
@@ -180,17 +177,33 @@ private void doComplete(boolean precondition, 
RunnableWithException complete, Ru
        }
 
        private <I, H extends AbstractChannelStateHandle<I>> void complete(
+                       StreamStateHandle underlying,
                        CompletableFuture<Collection<H>> future,
                        Map<I, List<Long>> offsets,
-                       BiFunction<I, List<Long>, H> buildHandle) {
+                       TriFunction<I, StreamStateHandle, List<Long>, H> 
buildHandle) throws IOException {
                final Collection<H> handles = new ArrayList<>();
                for (Map.Entry<I, List<Long>> e : offsets.entrySet()) {
-                       handles.add(buildHandle.apply(e.getKey(), 
e.getValue()));
+                       handles.add(createHandle(underlying, buildHandle, e));
                }
                future.complete(handles);
                LOG.debug("channel state write completed, checkpointId: {}, 
handles: {}", checkpointId, handles);
        }
 
+       private <I, H extends AbstractChannelStateHandle<I>> H createHandle(
+                       StreamStateHandle underlying,
+                       TriFunction<I, StreamStateHandle, List<Long>, H> 
buildHandle,
+                       Map.Entry<I, List<Long>> e) throws IOException {
+               if (underlying instanceof ByteStreamStateHandle) {
+                       ByteStreamStateHandle byteHandle = 
(ByteStreamStateHandle) underlying;
+                       return buildHandle.apply(
+                               e.getKey(),
+                               new 
ByteStreamStateHandle(randomUUID().toString(), 
serializer.extractAndMerge(byteHandle.getData(), e.getValue())),
+                               singletonList(serializer.getHeaderLength()));
+               } else {

Review comment:
       There is only one `InputChannelStateHandle` per channel.
   Putting multiple channels could make recovery (rescaling) more difficult.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
##########
@@ -25,8 +25,10 @@
 import org.apache.flink.runtime.state.InputChannelStateHandle;

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to