curcur commented on a change in pull request #13614:
URL: https://github.com/apache/flink/pull/13614#discussion_r504675916



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumerWithPartialRecordLength.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * BufferConsumer with partial record length if a record is spanning over 
buffers
+ *
+ * <p>`partialRecordLength` is the length of bytes to skip in order to start 
with a complete record,
+ * from position index 0 of the underlying MemorySegment. 
`partialRecordLength` is used in approximate
+ * local recovery to find the start position of a complete record on a 
BufferConsumer, so called
+ * `partial record clean-up`.
+ *
+ * <p>Partial records happen if a record can not fit into one buffer, then the 
remaining part of the same record
+ * is put into the next buffer. Hence partial records only exist at the 
beginning of a buffer.
+ * Partial record clean-up is needed in the mode of approximate local recovery.
+ * If a record is spanning over multiple buffers, and the first (several) 
buffers have got lost due to the failure
+ * of the receiver task, the remaining data belonging to the same record in 
transition should be cleaned up.
+ *
+ * <p> If partialRecordLength == 0, the buffer starts with a complete 
record</p>
+ * <p> If partialRecordLength > 0, the buffer starts with a partial record, 
its length = partialRecordLength</p>
+ * <p> If partialRecordLength < 0, partialRecordLength is undefined. It is 
currently used in
+ *                                                                     {@cite 
ResultSubpartitionRecoveredStateHandler#recover}</p>
+ */
+@NotThreadSafe
+public class BufferConsumerWithPartialRecordLength {
+       private final BufferConsumer bufferConsumer;
+       private final int partialRecordLength;
+
+       public BufferConsumerWithPartialRecordLength(BufferConsumer 
bufferConsumer, int partialRecordLength) {
+               this.bufferConsumer = checkNotNull(bufferConsumer);
+               this.partialRecordLength = partialRecordLength;
+       }
+
+       public BufferConsumer getBufferConsumer() {
+               return bufferConsumer;
+       }
+
+       public int getPartialRecordLength() {
+               return partialRecordLength;
+       }
+
+       public Buffer build() {
+               return bufferConsumer.build();
+       }
+
+       public PartialRecordCleanupResult cleanupPartialRecord() {

Review comment:
       
   > * `skipBuild(...)` would be replaced with `skip(...)`, that would just 
move the offset, without returning the buffer
   > * `.build()` call would be required afterwards, to get the remaining data
   
   That would be perfect if we do not have `bytesToSkip == 
buffer.getMaxCapacity()`.
   
   In the current build(), currentReaderPosition can never be 
`buffer.getMaxCapacity()`, because the consumer has already been marked as 
finished `currentReaderPosition = cachedWriterPosition` if the full buffer is 
read. Remember in case of a record spanning over buffers, we finish the builder 
first and then request a new builder.
   
   But I can still handle this special case in build() if you are fine to 
change the build() method. I was trying to not touch the build() method which 
is used in the normal PipelinedSubPartition read
   
   > Do you really need to return the buffer here? I have a feeling that the 
could would be simpler if:
   > 
   > * this method was returning just `true/false` if cleanup has finished or 
not.
   
   Yes, I need both `whether the clean-up is successful` + `returned buffer`, 
which would be called later in PipelinedApproximateSubpartition within 
`pollBuffer` method to
   1. decide whether the clean up is successful
   2. Handle sliced Buffer if successful or not.
   
   The reason I am returning an `empty` buffer instead of a `null` buffer in 
case of `bytesToSkip == buffer.getMaxCapacity()` is also to be easier to follow 
the current logic in pollBuffer what if we do not read data.
   
   Overall I want to wrap the clean-up logic within 
`BufferConsumerWithPartialRecordLength`, which is the main reason why we 
introduce this class?
   
   
   
    




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to