pnowojski commented on a change in pull request #12292:
URL: https://github.com/apache/flink/pull/12292#discussion_r429886453
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
##########
@@ -149,8 +152,8 @@ public void testRecordingOffsets() throws Exception {
}
private void write(ChannelStateCheckpointWriter writer,
InputChannelInfo channelInfo, byte[] data) throws Exception {
- NetworkBuffer buffer = new
NetworkBuffer(HeapMemorySegment.FACTORY.allocateUnpooledSegment(data.length,
null), FreeingBufferRecycler.INSTANCE);
- buffer.setBytes(0, data);
+ MemorySegment segment = wrap(data);
+ NetworkBuffer buffer = new NetworkBuffer(segment,
FreeingBufferRecycler.INSTANCE, Buffer.DataType.DATA_BUFFER, segment.size());
Review comment:
What was the problem here? Could you explain it in the commit message?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
##########
@@ -25,8 +25,10 @@
import org.apache.flink.runtime.state.InputChannelStateHandle;
Review comment:
Please copy the PR description to the commit message of this (last one)
commit.
Could you also extend both those descriptions and JIRA ticket description
(copy/paste) with some more elaborate explanation what was the underlying
problem?
> That the buffered bytes in `StreamStateHandle underlying` (if it's a
`ByteStreamStateHandle`) would be referenced many times, one per each input
channel and result partition by respective `InputChannelStateHandle` and
`ResultSubpartitionStateHandle` handles. Each of those handles would thus
duplicate and contain all of the data for every channel, while using only a
small portion of it.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
##########
@@ -180,17 +177,33 @@ private void doComplete(boolean precondition,
RunnableWithException complete, Ru
}
private <I, H extends AbstractChannelStateHandle<I>> void complete(
+ StreamStateHandle underlying,
CompletableFuture<Collection<H>> future,
Map<I, List<Long>> offsets,
- BiFunction<I, List<Long>, H> buildHandle) {
+ TriFunction<I, StreamStateHandle, List<Long>, H>
buildHandle) throws IOException {
Review comment:
Those two functions (`complete` and `createHandle` are) quite difficult
to understand/read. For example by looking at the signature
```
private <I, H extends AbstractChannelStateHandle<I>> H createHandle(
StreamStateHandle underlying,
TriFunction<I, StreamStateHandle, List<Long>, H>
buildHandle,
Map.Entry<I, List<Long>> e)
```
I have no idea what's happening here.
I would at the very least:
1. either extract `buildHandle` to some factory/builder or just replace it
by a boolean flag `boolean buildInputHandles`/enum
`INPUT_HANDLE`/`OUTPUT_HANDLE` and just if/switch inside.
2. do not pass `Map.Entry`, but convert it to a POJO or named variables ASAP
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
##########
@@ -180,17 +177,33 @@ private void doComplete(boolean precondition,
RunnableWithException complete, Ru
}
private <I, H extends AbstractChannelStateHandle<I>> void complete(
+ StreamStateHandle underlying,
CompletableFuture<Collection<H>> future,
Map<I, List<Long>> offsets,
- BiFunction<I, List<Long>, H> buildHandle) {
+ TriFunction<I, StreamStateHandle, List<Long>, H>
buildHandle) throws IOException {
final Collection<H> handles = new ArrayList<>();
for (Map.Entry<I, List<Long>> e : offsets.entrySet()) {
- handles.add(buildHandle.apply(e.getKey(),
e.getValue()));
+ handles.add(createHandle(underlying, buildHandle, e));
}
future.complete(handles);
LOG.debug("channel state write completed, checkpointId: {},
handles: {}", checkpointId, handles);
}
+ private <I, H extends AbstractChannelStateHandle<I>> H createHandle(
+ StreamStateHandle underlying,
+ TriFunction<I, StreamStateHandle, List<Long>, H>
buildHandle,
+ Map.Entry<I, List<Long>> e) throws IOException {
+ if (underlying instanceof ByteStreamStateHandle) {
+ ByteStreamStateHandle byteHandle =
(ByteStreamStateHandle) underlying;
+ return buildHandle.apply(
+ e.getKey(),
+ new
ByteStreamStateHandle(randomUUID().toString(),
serializer.extractAndMerge(byteHandle.getData(), e.getValue())),
+ singletonList(serializer.getHeaderLength()));
+ } else {
Review comment:
It doesn't look like a generic/error prone solution. It looks like
something should be extracted to the `StreamStateHandle`. `boolean
StreamStateHandle#isDirectlyHoldingData()`? Or we are missing some kind of
different abstraction of shared state handles.
Why is this issue new for spilled channel state? What's different for
operators state?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]