kl0u commented on a change in pull request #13357: URL: https://github.com/apache/flink/pull/13357#discussion_r492107272
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SortingThread.java ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +import static org.apache.flink.runtime.operators.sort.CircularElement.EOF_MARKER; +import static org.apache.flink.runtime.operators.sort.CircularElement.SPILLING_MARKER; + +/** + * The thread that sorts filled buffers. + */ +class SortingThread<E> extends ThreadBase<E> { + + /** Logging. */ + private static final Logger LOG = LoggerFactory.getLogger(SortingThread.class); + + private final IndexedSorter sorter; + + /** + * Creates a new sorting thread. + * + * @param exceptionHandler The exception handler to call for all exceptions. + * @param dispatcher The queues used to pass buffers between the threads. + */ + public SortingThread( + ExceptionHandler<IOException> exceptionHandler, StageMessageDispatcher<E> dispatcher) { Review comment: I think the convention is that each argument on a separate line. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/StageRunner.java ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.util.MutableObjectIterator; + +/** + * An interface for different stages of the sorting process. Different stages can communicate via + * the {@link StageMessageDispatcher}. + */ +public interface StageRunner extends AutoCloseable { + /** + * Starts the stage. + */ + void start(); + + /** + * A marker interface for sending messages to different stages. + */ + enum SortStage { + READ, + SORT, + SPILL + } + + /** + * A dispatcher for inter-stage communication. It allows for returning a result to a {@link Sorter} via + * {@link StageMessageDispatcher#sendResult(MutableObjectIterator)} + */ + interface StageMessageDispatcher<E> extends AutoCloseable { + /** + * Sends a message to the given stage. + */ + void send(SortStage stage, CircularElement<E> element); + + /** + * Retrieves and removes the head of the given queue, waiting if necessary + * until an element becomes available. + * + * @return the head of the queue + */ + CircularElement<E> take(SortStage stage) throws InterruptedException; Review comment: We always wrap the `InterruptedException` in an `IOException` one. Why not changing the signature to throw directly an `IOException`? This will simplify all subsequent code calls and we will have consistent behaviour across all callers. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SpillingThread.java ########## @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator; +import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader; +import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter; +import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView; +import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView; +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.util.EmptyMutableObjectIterator; +import org.apache.flink.util.MutableObjectIterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; + +import static org.apache.flink.runtime.operators.sort.CircularElement.EOF_MARKER; +import static org.apache.flink.runtime.operators.sort.CircularElement.SPILLING_MARKER; + +/** + * The thread that handles the spilling of intermediate results and sets up the merging. It also merges the + * channels until sufficiently few channels remain to perform the final streamed merge. + */ +final class SpillingThread<E> extends ThreadBase<E> { + /** + * An interface for injecting custom behaviour for spilling and merging phases. + * @param <E> Review comment: Good to update this comment about `<E>`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SpillingThread.java ########## @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator; +import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader; +import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter; +import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView; +import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView; +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.util.EmptyMutableObjectIterator; +import org.apache.flink.util.MutableObjectIterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; + +import static org.apache.flink.runtime.operators.sort.CircularElement.EOF_MARKER; +import static org.apache.flink.runtime.operators.sort.CircularElement.SPILLING_MARKER; + +/** + * The thread that handles the spilling of intermediate results and sets up the merging. It also merges the + * channels until sufficiently few channels remain to perform the final streamed merge. + */ +final class SpillingThread<E> extends ThreadBase<E> { + /** + * An interface for injecting custom behaviour for spilling and merging phases. + * @param <E> + */ + interface SpillingBehaviour<E> { + default void open() {} + + default void close() {} + + /** + * An method that allows adjusting the spilling phase. We can inject e.g. combining the elements while spilling. + */ + void spillBuffer( + CircularElement<E> element, + ChannelWriterOutputView output, + LargeRecordHandler<E> largeRecordHandler) throws IOException; + + /** + * An method that allows adjusting the merging phase. We can inject e.g. combining the spilled elements. + */ + void mergeRecords(MergeIterator<E> mergeIterator, ChannelWriterOutputView output) throws IOException; + } + + /** Logging. */ + private static final Logger LOG = LoggerFactory.getLogger(SpillingThread.class); + + private final MemoryManager memManager; // memory manager to release memory + + private final IOManager ioManager; // I/O manager to create channels + + private final TypeSerializer<E> serializer; // The serializer for the data type + + private final TypeComparator<E> comparator; // The comparator that establishes the order relation. + + private final List<MemorySegment> writeMemory; // memory segments for writing + + private final List<MemorySegment> mergeReadMemory; // memory segments for sorting/reading + + private final int maxFanIn; + + private final SpillChannelManager spillChannelManager; + + private final LargeRecordHandler<E> largeRecordHandler; + + private final SpillingBehaviour<E> spillingBehaviour; + + private boolean spillingBehaviourOpened = false; + + private final int minNumWriteBuffers; + + private final int maxNumWriteBuffers; + + /** + * Creates the spilling thread. + * @param exceptionHandler The exception handler to call for all exceptions. + * @param dispatcher The queues used to pass buffers between the threads. + * @param memManager The memory manager used to allocate buffers for the readers and writers. + * @param ioManager The I/I manager used to instantiate readers and writers from. + * @param serializer + * @param comparator + * @param sortReadMemory + * @param writeMemory + * @param maxNumFileHandles + * @param spillingChannelManager + * @param largeRecordHandler + * @param minNumWriteBuffers + * @param maxNumWriteBuffers + */ + SpillingThread( + ExceptionHandler<IOException> exceptionHandler, + StageMessageDispatcher<E> dispatcher, + MemoryManager memManager, + IOManager ioManager, + TypeSerializer<E> serializer, + TypeComparator<E> comparator, + List<MemorySegment> sortReadMemory, + List<MemorySegment> writeMemory, + int maxNumFileHandles, + SpillChannelManager spillingChannelManager, + LargeRecordHandler<E> largeRecordHandler, + SpillingBehaviour<E> spillingBehaviour, + int minNumWriteBuffers, + int maxNumWriteBuffers) { + super(exceptionHandler, "SortMerger spilling thread", dispatcher); + this.memManager = memManager; + this.ioManager = ioManager; + this.serializer = serializer; + this.comparator = comparator; + this.mergeReadMemory = sortReadMemory; + this.writeMemory = writeMemory; + this.maxFanIn = maxNumFileHandles; + this.spillChannelManager = spillingChannelManager; + this.largeRecordHandler = largeRecordHandler; + this.spillingBehaviour = spillingBehaviour; + this.minNumWriteBuffers = minNumWriteBuffers; + this.maxNumWriteBuffers = maxNumWriteBuffers; + } + + /** + * Entry point of the thread. + */ + public void go() throws IOException { + + final Queue<CircularElement<E>> cache = new ArrayDeque<>(); + boolean cacheOnly = readCache(cache); + + // check whether the thread was canceled + if (!isRunning()) { + return; + } + + MutableObjectIterator<E> largeRecords = null; + + // check if we can stay in memory with the large record handler + if (cacheOnly && largeRecordHandler != null && largeRecordHandler.hasData()) { + List<MemorySegment> memoryForLargeRecordSorting = new ArrayList<>(); + + CircularElement<E> circElement; + while ((circElement = this.dispatcher.poll(SortStage.READ)) != null) { + circElement.getBuffer().dispose(); + memoryForLargeRecordSorting.addAll(circElement.getMemory()); + } + + if (memoryForLargeRecordSorting.isEmpty()) { + cacheOnly = false; + LOG.debug("Going to disk-based merge because of large records."); + } else { + LOG.debug("Sorting large records, to add them to in-memory merge."); + largeRecords = largeRecordHandler.finishWriteAndSortKeys(memoryForLargeRecordSorting); + } + } + + // ------------------- In-Memory Merge ------------------------ + if (cacheOnly) { + mergeInMemory(cache, largeRecords); + return; + } + + // ------------------- Spilling Phase ------------------------ + List<ChannelWithBlockCount> channelIDs = startSpilling(cache); + + // ------------------- Merging Phase ------------------------ + + mergeOnDisk(channelIDs); + } + + @Override + public void close() throws InterruptedException { + super.close(); + if (spillingBehaviourOpened) { + this.spillingBehaviour.close(); + this.spillingBehaviourOpened = false; + } + } + + private boolean readCache(Queue<CircularElement<E>> cache) throws IOException { + CircularElement<E> element; + + // ------------------- In-Memory Cache ------------------------ + // fill cache + while (isRunning()) { + // take next element from queue + try { + element = this.dispatcher.take(SortStage.SPILL); Review comment: Here we are changing a bit the behaviour of the exception handling compared to what is currently in the master. The change seems reasonable, but I want to point it out for future reference (if needed) and also to verify that this was a conscious change. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombineValueIterator.java ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.util.TraversableOnceException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * This class implements an iterator over values from a sort buffer. The iterator returns the values of a given + * interval. + */ +final class CombineValueIterator<E> implements Iterator<E>, Iterable<E> { + private static final Logger LOG = LoggerFactory.getLogger(CombineValueIterator.class); Review comment: Nit: I would leave a line empty after each arguments but feel free to ignore this. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SpillingThread.java ########## @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator; +import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader; +import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter; +import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView; +import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView; +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.util.EmptyMutableObjectIterator; +import org.apache.flink.util.MutableObjectIterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; + +import static org.apache.flink.runtime.operators.sort.CircularElement.EOF_MARKER; +import static org.apache.flink.runtime.operators.sort.CircularElement.SPILLING_MARKER; + +/** + * The thread that handles the spilling of intermediate results and sets up the merging. It also merges the + * channels until sufficiently few channels remain to perform the final streamed merge. + */ +final class SpillingThread<E> extends ThreadBase<E> { + /** + * An interface for injecting custom behaviour for spilling and merging phases. + * @param <E> + */ + interface SpillingBehaviour<E> { + default void open() {} + + default void close() {} + + /** + * An method that allows adjusting the spilling phase. We can inject e.g. combining the elements while spilling. Review comment: "An" -> "A" ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ThreadBase.java ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import java.io.IOException; + +/** + * Base class for all working threads in this sort-merger. The specific threads for reading, sorting, spilling, + * merging, etc... extend this subclass. + * <p> + * The threads are designed to terminate themselves when the task they are set up to do is completed. Further more, + * they terminate immediately when the <code>shutdown()</code> method is called. + */ +abstract class ThreadBase<E> extends Thread implements Thread.UncaughtExceptionHandler, StageRunner { + + /** + * The queue of empty buffer that can be used for reading; + */ + protected final StageMessageDispatcher<E> dispatcher; + + /** + * The exception handler for any problems. + */ + private final ExceptionHandler<IOException> exceptionHandler; + + /** + * The flag marking this thread as alive. + */ + private volatile boolean alive; + + /** + * Creates a new thread. + * + * @param exceptionHandler The exception handler to call for all exceptions. + * @param name The name of the thread. + * @param queues The queues used to pass buffers between the threads. + */ + protected ThreadBase( + ExceptionHandler<IOException> exceptionHandler, + String name, + StageMessageDispatcher<E> queues) { + // thread setup + super(name); Review comment: Why not adding `checkNotNull()` or `@Nullable` wherever needed? This will make clearer the contract for each arg. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SpillingThread.java ########## @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator; +import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader; +import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter; +import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView; +import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView; +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.util.EmptyMutableObjectIterator; +import org.apache.flink.util.MutableObjectIterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; + +import static org.apache.flink.runtime.operators.sort.CircularElement.EOF_MARKER; +import static org.apache.flink.runtime.operators.sort.CircularElement.SPILLING_MARKER; + +/** + * The thread that handles the spilling of intermediate results and sets up the merging. It also merges the + * channels until sufficiently few channels remain to perform the final streamed merge. + */ +final class SpillingThread<E> extends ThreadBase<E> { + /** + * An interface for injecting custom behaviour for spilling and merging phases. + * @param <E> + */ + interface SpillingBehaviour<E> { + default void open() {} + + default void close() {} + + /** + * An method that allows adjusting the spilling phase. We can inject e.g. combining the elements while spilling. + */ + void spillBuffer( + CircularElement<E> element, + ChannelWriterOutputView output, + LargeRecordHandler<E> largeRecordHandler) throws IOException; + + /** + * An method that allows adjusting the merging phase. We can inject e.g. combining the spilled elements. + */ + void mergeRecords(MergeIterator<E> mergeIterator, ChannelWriterOutputView output) throws IOException; + } + + /** Logging. */ + private static final Logger LOG = LoggerFactory.getLogger(SpillingThread.class); + + private final MemoryManager memManager; // memory manager to release memory + + private final IOManager ioManager; // I/O manager to create channels + + private final TypeSerializer<E> serializer; // The serializer for the data type + + private final TypeComparator<E> comparator; // The comparator that establishes the order relation. + + private final List<MemorySegment> writeMemory; // memory segments for writing + + private final List<MemorySegment> mergeReadMemory; // memory segments for sorting/reading + + private final int maxFanIn; + + private final SpillChannelManager spillChannelManager; + + private final LargeRecordHandler<E> largeRecordHandler; + + private final SpillingBehaviour<E> spillingBehaviour; + + private boolean spillingBehaviourOpened = false; + + private final int minNumWriteBuffers; + + private final int maxNumWriteBuffers; + + /** + * Creates the spilling thread. + * @param exceptionHandler The exception handler to call for all exceptions. + * @param dispatcher The queues used to pass buffers between the threads. + * @param memManager The memory manager used to allocate buffers for the readers and writers. + * @param ioManager The I/I manager used to instantiate readers and writers from. + * @param serializer + * @param comparator + * @param sortReadMemory + * @param writeMemory + * @param maxNumFileHandles + * @param spillingChannelManager + * @param largeRecordHandler + * @param minNumWriteBuffers + * @param maxNumWriteBuffers + */ + SpillingThread( Review comment: Adding some `checkNotNull()` checks? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SpillChannelManager.java ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; + +import java.io.File; +import java.util.HashSet; +import java.util.Iterator; + +/** + * Channel manager to manage the life cycle of spill channels. + */ +final class SpillChannelManager implements AutoCloseable { + + /** + * Collection of all currently open channels, to be closed and deleted during cleanup. + */ + private final HashSet<FileIOChannel> openChannels; + + /** + * Collection of all temporary files created and to be removed when closing the sorter. + */ + private final HashSet<FileIOChannel.ID> channelsToDeleteAtShutdown; + + private volatile boolean closed; + + public SpillChannelManager() { + this.channelsToDeleteAtShutdown = new HashSet<>(64); + this.openChannels = new HashSet<>(64); + } + + /** + * Adds a channel to the list of channels that are to be removed at shutdown. + * + * @param channel The channel id. + */ + synchronized void registerChannelToBeRemovedAtShudown(FileIOChannel.ID channel) { Review comment: Typo: All the method names miss a `t` in the `Shu**t**down`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
