gianm commented on code in PR #12848:
URL: https://github.com/apache/druid/pull/12848#discussion_r936247636


##########
processing/src/main/java/org/apache/druid/frame/processor/FrameProcessor.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor;
+
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A FrameProcessor is like an incremental version of Runnable that operates 
on {@link ReadableFrameChannel} and
+ * {@link WritableFrameChannel}.
+ *
+ * It is designed to enable interleaved non-blocking work on a fixed-size 
thread pool. Typically, this is done using
+ * an instance of {@link FrameProcessorExecutor}.
+ */
+public interface FrameProcessor<T>
+{
+  /**
+   * List of input channels. The positions of channels in this list are used 
to build the {@code readableInputs} set
+   * provided to {@link #runIncrementally}.
+   */
+  List<ReadableFrameChannel> inputChannels();
+
+  /**
+   * List of output channels.
+   */
+  List<WritableFrameChannel> outputChannels();
+
+  /**
+   * Runs some of the algorithm, without blocking, and either returns a value 
or a set of input channels
+   * to wait for. This method is called by {@link 
FrameProcessorExecutor#runFully} when all output channels are
+   * writable. Therefore, it is guaranteed that each output channel can accept 
at least one frame.
+   *
+   * This method must not read more than one frame from each readable input 
channel, and must not write more than one
+   * frame to each output channel.
+   *
+   * @param readableInputs channels from {@link #inputChannels()} that are 
either finished or ready to read.
+   *
+   * @return either a final return value or a set of input channels to wait 
for. Must be nonnull.
+   */
+  ReturnOrAwait<T> runIncrementally(IntSet readableInputs) throws IOException;

Review Comment:
   This comment is the most interesting one so I'm replying to it first 🙂. I 
didn't explain much about how I expected this interface to be used, so I'll try 
to rectify that now.
   
   The goal with all this nonblocking stuff is to enable three things:
   
   1. In dedicated task JVMs with one processing thread, interleave the work 
done by pipelined processors.
   2. In dedicated task JVMs with multiple processing threads, allow some 
concurrency: we can start one processor per input file, and process multiple 
input files in parallel.
   3. In servers that run multiple queries/jobs at once, like the Indexer or 
like a future server that will do interactive queries with this code: allow 
interleaving of work for multiple queries/jobs. In the context of computing 
generally, it's the model used by coroutines/goroutines/similar things, as you 
have pointed out. In the context of Druid, it's a more powerful version of what 
the Historicals do with their processing pool today (class: 
QueryProcessingPool). More powerful because the QueryProcessingPool does not 
allow for incremental work on any granularity other than a whole segment. The 
frame processors can do incremental work on as fine of a granularity as they 
want.
   
   > I wonder, do we intend to provide some kind of helper classes to manage 
the complex state machine otherwise required to keep track of this state?
   
   You're right that most processors are going to want to maintain some state, 
potentially including a "next output frame" as well as saved input frames from 
their various channels. (For example: FrameChannelMerger does this with a 
`currentFrames` array.)
   
   I don't have anything special in mind to help the processors track state. I 
do share your concern that they can get complicated. I've noticed that as I've 
been implementing processors. I'm open to ideas about how to better avoid it. 
Do you have any pointers to similar stuff that you've seen work in the past?
   
   > Also, might the semantics be better if each call did as much work as it 
can without blocking? That sort we discussed, if five inputs are ready, why not 
just buffer them all rather than churning the CPU just to return, reschedule, 
and re-call the same processor?
   
   I figure most processors will want to buffer all inputs that are ready, and 
do as much work as possible with those inputs, up to writing one output frame. 
Then they'll need to exit, since the contract only allows writing one frame per 
call. Some things I thought about, but didn't do (yet?):
   
   1. Exiting "early" based on a timer, before doing as much work as possible, 
to enable better interleaving of different workloads.
   2. Adjusting the contract to allow processors to write more than one frame 
per `runIncrementally`, if the output channel is able to accept more than one.
   
   Curious what your thoughts are.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to