pnowojski commented on a change in pull request #12120:
URL: https://github.com/apache/flink/pull/12120#discussion_r425583062



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java
##########
@@ -67,8 +70,11 @@ void run() {
                } catch (Exception ex) {
                        thrown = ex;
                } finally {
-                       cleanupRequests();
-                       dispatcher.fail(thrown == null ? new 
CancellationException() : thrown);
+                       try {
+                               closeAll(this::cleanupRequests, () -> 
dispatcher.fail(thrown == null ? new CancellationException() : thrown));
+                       } catch (Exception e) {
+                               LOG.error("unable to terminate properly", e);
+                       }

Review comment:
       we had quite a bit of problems with error logging like that polluting 
the logs failing e2e tests. It would be better to add `e` as suppressed 
exception to the `thrown`, like 
`org.apache.flink.streaming.runtime.tasks.StreamTask#invoke` is doing.
   ```
   } catch (Exception e) {
        dispatcher.fail(ExceptionUtils.firstOrSuppressed(e, thrown));
   }
   ```
   ?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
##########
@@ -290,4 +309,49 @@ public int read(byte[] b, int off, int len) {
        public int read(byte[] b) {
                return read(b, 0, b.length);
        }
+
+       ByteBuffer wrapIntoByteBuffer() {
+               return segment.wrap(position, remaining());
+       }
+
+       int copyContentTo(byte[] dst) {
+               final int numBytesChunk = remaining();
+               segment.get(position, dst, 0, numBytesChunk);
+               return numBytesChunk;
+       }
+
+       /**
+        * Copies the data and transfers the "ownership" (i.e. clears current 
wrapper).
+        */
+       void transferTo(ByteBuffer dst) {
+               segment.get(position, dst, remaining());
+               clear();
+       }
+
+       Tuple2<DeserializationResult, Integer> getNextRecord(IOReadableWritable 
target) throws IOException {
+               int recordLen = readInt();
+               if (canReadRecord(recordLen)) {
+                       return readInto(target);
+               } else {
+                       return new Tuple2<>(PARTIAL_RECORD, recordLen);
+               }
+       }

Review comment:
       nit: by wrapper I meant a POJO not Tuple2, but this is probably fine.
   
   However now I'm a bit worried about performance implications of this - it's 
a critical/hot path, and we are allocating new object on it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to