gianm commented on code in PR #12848: URL: https://github.com/apache/druid/pull/12848#discussion_r936247636
########## processing/src/main/java/org/apache/druid/frame/processor/FrameProcessor.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor; + +import it.unimi.dsi.fastutil.ints.IntSet; +import org.apache.druid.frame.channel.ReadableFrameChannel; +import org.apache.druid.frame.channel.WritableFrameChannel; + +import java.io.IOException; +import java.util.List; + +/** + * A FrameProcessor is like an incremental version of Runnable that operates on {@link ReadableFrameChannel} and + * {@link WritableFrameChannel}. + * + * It is designed to enable interleaved non-blocking work on a fixed-size thread pool. Typically, this is done using + * an instance of {@link FrameProcessorExecutor}. + */ +public interface FrameProcessor<T> +{ + /** + * List of input channels. The positions of channels in this list are used to build the {@code readableInputs} set + * provided to {@link #runIncrementally}. + */ + List<ReadableFrameChannel> inputChannels(); + + /** + * List of output channels. + */ + List<WritableFrameChannel> outputChannels(); + + /** + * Runs some of the algorithm, without blocking, and either returns a value or a set of input channels + * to wait for. This method is called by {@link FrameProcessorExecutor#runFully} when all output channels are + * writable. Therefore, it is guaranteed that each output channel can accept at least one frame. + * + * This method must not read more than one frame from each readable input channel, and must not write more than one + * frame to each output channel. + * + * @param readableInputs channels from {@link #inputChannels()} that are either finished or ready to read. + * + * @return either a final return value or a set of input channels to wait for. Must be nonnull. + */ + ReturnOrAwait<T> runIncrementally(IntSet readableInputs) throws IOException; Review Comment: This comment is the most interesting one so I'm replying to it first 🙂. I didn't explain much about how I expected this interface to be used, so I'll try to rectify that now. The goal with all this nonblocking stuff is to enable three things: 1. In dedicated task JVMs with one processing thread, interleave the work done by pipelined processors. 2. In dedicated task JVMs with multiple processing threads, allow some concurrency: we can start one processor per input file, and process multiple input files in parallel. 3. In servers that run multiple queries/jobs at once, like the Indexer or like a future server that will do interactive queries with this code: allow interleaving of work for multiple queries/jobs. In the context of computing generally, it's the model used by coroutines/goroutines/similar things, as you have pointed out. In the context of Druid, it's a more powerful version of what the Historicals do with their processing pool today (class: QueryProcessingPool). More powerful because the QueryProcessingPool does not allow for incremental work on any granularity other than a whole segment. The frame processors can do incremental work on as fine of a granularity as they want. > I wonder, do we intend to provide some kind of helper classes to manage the complex state machine otherwise required to keep track of this state? You're right that most processors are going to want to maintain some state, potentially including a "next output frame" as well as saved input frames from their various channels. (For example: FrameChannelMerger does this with a `currentFrames` array.) I don't have anything special in mind to help the processors track state. I do share your concern that they can get complicated. I've noticed that as I've been implementing processors. I'm open to ideas about how to better avoid it. Do you have any pointers to similar stuff that you've seen work in the past? > Also, might the semantics be better if each call did as much work as it can without blocking? That sort we discussed, if five inputs are ready, why not just buffer them all rather than churning the CPU just to return, reschedule, and re-call the same processor? I figure most processors will want to buffer all inputs that are ready, and do as much work as possible with those inputs, up to writing one output frame. Then they'll need to exit, since the contract only allows writing one frame per call. Some things I thought about, but didn't do (yet?): 1. Exiting "early" based on a timer, before doing as much work as possible, to enable better interleaving of different workloads. 2. Adjusting the contract to allow processors to write more than one frame per `runIncrementally`, if the output channel is able to accept more than one. Curious what your thoughts are. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
