rkhachatryan commented on code in PR #28107: URL: https://github.com/apache/flink/pull/28107#discussion_r3196380052
########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/FilteredBufferDispatcher.java: ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.channel; + +import org.apache.flink.annotation.Internal; + +import java.io.IOException; + +/** + * Dispatches filtered channel-state data across multiple channels' {@link + * org.apache.flink.runtime.io.network.partition.consumer.RecoveredBufferStore}s. {@link + * #write(byte[], int, InputChannelInfo)} pushes data for a target channel; the implementation + * decides whether to use a network buffer (P1), spill to disk (P2), or replay from disk (P3). + */ +@Internal +public interface FilteredBufferDispatcher extends AutoCloseable { Review Comment: Why this ( and `FilteredSpillFile`) are not per-channel? I think this adds quite some complexity ########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/FilteredBufferDispatcher.java: ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.channel; + +import org.apache.flink.annotation.Internal; + +import java.io.IOException; + +/** + * Dispatches filtered channel-state data across multiple channels' {@link + * org.apache.flink.runtime.io.network.partition.consumer.RecoveredBufferStore}s. {@link + * #write(byte[], int, InputChannelInfo)} pushes data for a target channel; the implementation + * decides whether to use a network buffer (P1), spill to disk (P2), or replay from disk (P3). + */ +@Internal +public interface FilteredBufferDispatcher extends AutoCloseable { + + void write(byte[] data, int length, InputChannelInfo channelInfo) + throws IOException, InterruptedException; Review Comment: Why FilteredBufferDispatcher operates on byte[] and not on Buffer (so that we can use off-heap now or in the future) ########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java: ########## @@ -204,6 +212,12 @@ public void addInputData( int startSeqNum, CloseableIterator<Buffer> data) {} + @Override + public void addInputDataFromSpill( + long checkpointId, CloseableIterator<FilteredSpillFile.Chunk> chunks) { + IOUtils.closeQuietly(chunks); + } + Review Comment: Why ChannelStateWriter has to be modified? Why the existing addInputData is not enough? ########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredBufferStoreCoordinator.java: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.channel; + +import org.apache.flink.annotation.Internal; + +/** + * Cross-channel coordinator notified by per-channel {@link + * org.apache.flink.runtime.io.network.partition.consumer.RecoveredBufferStore} instances on + * lifecycle events. Centralises bookkeeping that spans multiple channels (checkpoint wait-sets, + * shared on-disk spill state). + * + * <p>The {@code onChannel*} callbacks fire from the Task thread <em>outside</em> the calling + * store's lock, so implementations may freely acquire their own synchronisation. {@link + * #getCurrentDrainHead()} fires <em>inside</em> the calling store's lock so the store can capture a + * consistent (readyBuffers, drainHead) pair atomically — implementations must therefore avoid + * blocking and must not acquire any lock participating in the store-lock cycle (a plain {@code + * volatile} read is the intended implementation). + */ +@Internal +public interface RecoveredBufferStoreCoordinator { + + /** + * Position of the next entry the drain bundle will pop from the global FIFO. Returns {@link + * EntryPosition#END} when no disk entries are pending. Must be cheap and lock-free. + */ + EntryPosition getCurrentDrainHead(); + + /** + * Invoked from {@code RecoveredBufferStore#checkpoint} after the store has snapshotted its + * ready buffers. {@code startPos} is the drain-head captured atomically with that snapshot; + * phase-2 uses it as the per-channel cutoff to split spill entries between this channel's Step + * 1 snapshot and the global checkpoint drain. + */ + void onChannelCheckpointStarted( + long checkpointId, InputChannelInfo channelInfo, EntryPosition startPos); Review Comment: Why onChannelCheckpointStarted requires EntryPosition? It looks like something RecoveredBufferStoreCoordinator should know itself? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java: ########## @@ -278,24 +273,56 @@ protected int peekNextBufferSubpartitionIdInternal() throws IOException { @Override public Optional<BufferAndAvailability> getNextBuffer() throws IOException { - final SequenceBuffer next; + // Single critical section so "is recovery done", "is there a priority event", "what is + // the next data type" cannot be torn against the underlying queues. Splitting would let + // producers slip data into receivedBuffers (or drain the store) between segments and + // surface a stale moreAvailable that hides queued buffers from the gate. + final Buffer recoveredBuffer; + final SequenceBuffer fromReceivedBuffers; final DataType nextDataType; - synchronized (receivedBuffers) { - checkReadability(); - - next = receivedBuffers.poll(); - - if (next != null) { - totalQueueSizeInBytes -= next.buffer.getSize(); + synchronized (recoveredStore) { + if (!recoveredStore.isEmpty()) { + if (hasPendingPriorityEvent) { + fromReceivedBuffers = pollPendingPriorityEvent(); + if (fromReceivedBuffers == null) { + // Invariant should keep the flag aligned with priority count; defensive + // yield mirrors pre-refactor behavior. + return Optional.empty(); + } + nextDataType = peekNextDataType(); + recoveredBuffer = null; + } else { + recoveredBuffer = recoveredStore.tryTake(); Review Comment: I'm concerned about the complexity around consuming side (channels). Why can't we add buffers directly to channels, e.g. to RemoteInputChannel.receivedBuffers? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
