pnowojski commented on a change in pull request #12120:
URL: https://github.com/apache/flink/pull/12120#discussion_r425583062
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java
##########
@@ -67,8 +70,11 @@ void run() {
} catch (Exception ex) {
thrown = ex;
} finally {
- cleanupRequests();
- dispatcher.fail(thrown == null ? new
CancellationException() : thrown);
+ try {
+ closeAll(this::cleanupRequests, () ->
dispatcher.fail(thrown == null ? new CancellationException() : thrown));
+ } catch (Exception e) {
+ LOG.error("unable to terminate properly", e);
+ }
Review comment:
we had quite a bit of problems with error logging like that polluting
the logs failing e2e tests. It would be better to add `e` as suppressed
exception to the `thrown`, like
`org.apache.flink.streaming.runtime.tasks.StreamTask#invoke` is doing.
```
} catch (Exception e) {
dispatcher.fail(ExceptionUtils.firstOrSuppressed(e, thrown));
}
```
?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
##########
@@ -290,4 +309,49 @@ public int read(byte[] b, int off, int len) {
public int read(byte[] b) {
return read(b, 0, b.length);
}
+
+ ByteBuffer wrapIntoByteBuffer() {
+ return segment.wrap(position, remaining());
+ }
+
+ int copyContentTo(byte[] dst) {
+ final int numBytesChunk = remaining();
+ segment.get(position, dst, 0, numBytesChunk);
+ return numBytesChunk;
+ }
+
+ /**
+ * Copies the data and transfers the "ownership" (i.e. clears current
wrapper).
+ */
+ void transferTo(ByteBuffer dst) {
+ segment.get(position, dst, remaining());
+ clear();
+ }
+
+ Tuple2<DeserializationResult, Integer> getNextRecord(IOReadableWritable
target) throws IOException {
+ int recordLen = readInt();
+ if (canReadRecord(recordLen)) {
+ return readInto(target);
+ } else {
+ return new Tuple2<>(PARTIAL_RECORD, recordLen);
+ }
+ }
Review comment:
nit: by wrapper I meant a POJO not Tuple2, but this is probably fine.
However now I'm a bit worried about performance implications of this - it's
a critical/hot path, and we are allocating new object on it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]