cryptoe commented on code in PR #15007:
URL: https://github.com/apache/druid/pull/15007#discussion_r1335933517
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java:
##########
@@ -1152,7 +1152,7 @@ private void makeShuffleOutputChannelFactory(boolean
isFinalStage)
);
}
- private <FactoryType extends FrameProcessorFactory<I, WorkerClass, T, R>,
I, WorkerClass extends FrameProcessor<T>, T, R> void makeAndRunWorkProcessors()
+ private <FactoryType extends FrameProcessorFactory<T, R, I>, T, R, I> void
makeAndRunWorkProcessors()
Review Comment:
Nit: It would be helpful if we could rename this template variables to
something else more inline in what they are?
For eg : I is extraInfo,
R is FrameProcessorResult,
T -> FrameProcessorInput
?
##########
processing/src/main/java/org/apache/druid/frame/processor/manager/SequenceProcessorManager.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor.manager;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.common.guava.Yielders;
+import org.apache.druid.utils.CloseableUtils;
+
+import java.util.NoSuchElementException;
+import java.util.Optional;
+
+/**
+ * Processor manager based on a {@link Sequence}. Returns the number of
processors run.
+ */
+public class SequenceProcessorManager<T, P extends FrameProcessor<T>>
implements ProcessorManager<T, Long>
+{
+ private final Sequence<P> sequence;
+ private Yielder<P> yielder;
+ private boolean done;
+ private long numProcessors;
+
+ SequenceProcessorManager(final Sequence<P> sequence)
+ {
+ this.sequence = sequence;
+ }
+
+ @Override
+ public ListenableFuture<Optional<ProcessorAndCallback<T>>> next()
+ {
+ initializeYielderIfNeeded();
+
+ if (done) {
+ throw new NoSuchElementException();
+ } else if (yielder.isDone()) {
+ done = true;
+ return Futures.immediateFuture(Optional.empty());
+ } else {
+ final P retVal;
+ try {
+ retVal = Preconditions.checkNotNull(yielder.get(), "processor");
+ yielder = yielder.next(null);
+ }
+ catch (Throwable e) {
+ // Some problem with yielder.get() or yielder.next(null). Close the
yielder and mark us as done.
+ done = true;
+ CloseableUtils.closeAndSuppressExceptions(yielder, e::addSuppressed);
+ yielder = null;
+ throw e;
+ }
+
+ return Futures.immediateFuture(Optional.of(new
ProcessorAndCallback<>(retVal, r -> numProcessors++)));
+ }
+ }
+
+ @Override
+ public Long result()
+ {
+ return numProcessors;
+ }
+
+ @Override
+ public void close()
+ {
+ if (yielder != null) {
+ // Clean up.
+ CloseableUtils.closeAndWrapExceptions(yielder);
+ done = true;
+ yielder = null;
+ }
Review Comment:
In this close call, it might me that the yielder is not initialized, and
then the done variable will not be set.
It will result in this erroneous flow to pass.
```SPM spm=new SPM();
spm.close();
spm.next();
```
here spm.next() should throw a ` NoSuchElementException();`
I think done=true should be moved out of this null check.
##########
processing/src/main/java/org/apache/druid/frame/processor/manager/ProcessorManager.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor.manager;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.frame.processor.FrameProcessorExecutor;
+import org.apache.druid.java.util.common.guava.Sequence;
+
+import java.io.Closeable;
+import java.util.Optional;
+import java.util.function.BiFunction;
+
+/**
+ * Used by {@link FrameProcessorExecutor#runAllFully} to manage the launching
of processors. Processors returned by
+ * this class may run concurrently with each other.
+ *
+ * This interface allows for simple sequences of processors, such as {@link
ProcessorManagers#of(Sequence)}. It also
+ * allows for situations where later processors depend on the results of
earlier processors. (The result of earlier
+ * processors are made available to the manager through {@link
ProcessorAndCallback#onComplete(Object)}.)
+ *
+ * Implementations do not need to be thread-safe.
+ */
+public interface ProcessorManager<T, R> extends Closeable
+{
+ /**
+ * Returns the next processor that should be run, along with a callback. The
callback is called when the processor
+ * completes successfully, along with the result of the processor. If the
processor fails, the callback is not called.
+ *
+ * The callback is called in a thread-safe manner: it will never be called
concurrently with another callback, or
+ * concurrently with a call to "next" or {@link #close()}.
+ *
+ * Returns an empty Optional if there are no more processors to run.
+ *
+ * @throws java.util.NoSuchElementException if a prior call to this method
had returned an empty Optional
+ */
+ ListenableFuture<Optional<ProcessorAndCallback<T>>> next();
+
+ /**
+ * Called after all procesors are done, prior to {@link #close()}, to
retrieve the result of this computation.
+ */
+ R result();
+
+ /**
+ * Called when all processors are done, or when one has failed.
+ *
+ * This method releases all resources associated with this manager. After
this method is called, no other methods
+ * are called.
+ */
+ @Override
+ void close();
+
+ default <R2> ProcessorManager<T, R2> withAccumulation(
+ R2 initialResult,
+ BiFunction<R2, T, R2> accumulateFn
Review Comment:
What does T represent here.
R seems to be the final result type.
Is r2 the intermediate result of processor's ?
Could you please add java doc to this method. It seems like this would be
called once to chain process managers.
##########
processing/src/main/java/org/apache/druid/frame/processor/manager/ProcessorManager.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor.manager;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.frame.processor.FrameProcessorExecutor;
+import org.apache.druid.java.util.common.guava.Sequence;
+
+import java.io.Closeable;
+import java.util.Optional;
+import java.util.function.BiFunction;
+
+/**
+ * Used by {@link FrameProcessorExecutor#runAllFully} to manage the launching
of processors. Processors returned by
+ * this class may run concurrently with each other.
+ *
+ * This interface allows for simple sequences of processors, such as {@link
ProcessorManagers#of(Sequence)}. It also
+ * allows for situations where later processors depend on the results of
earlier processors. (The result of earlier
+ * processors are made available to the manager through {@link
ProcessorAndCallback#onComplete(Object)}.)
+ *
+ * Implementations do not need to be thread-safe.
+ */
+public interface ProcessorManager<T, R> extends Closeable
+{
+ /**
+ * Returns the next processor that should be run, along with a callback. The
callback is called when the processor
+ * completes successfully, along with the result of the processor. If the
processor fails, the callback is not called.
+ *
+ * The callback is called in a thread-safe manner: it will never be called
concurrently with another callback, or
+ * concurrently with a call to "next" or {@link #close()}.
+ *
+ * Returns an empty Optional if there are no more processors to run.
+ *
+ * @throws java.util.NoSuchElementException if a prior call to this method
had returned an empty Optional
+ */
+ ListenableFuture<Optional<ProcessorAndCallback<T>>> next();
+
+ /**
+ * Called after all procesors are done, prior to {@link #close()}, to
retrieve the result of this computation.
+ */
+ R result();
Review Comment:
Is this a contract that results should be called only after next returns and
empty optional.
If yes then we need to update sequenceProcessorManager#result method with a
done check.
##########
processing/src/main/java/org/apache/druid/frame/processor/manager/AccumulatingProcessorManager.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor.manager;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.common.guava.FutureUtils;
+
+import java.util.Optional;
+import java.util.function.BiFunction;
+
+/**
+ * Processor manager that wraps another {@link ProcessorManager} and
accumulates a result.
+ */
+public class AccumulatingProcessorManager<T, R> implements ProcessorManager<T,
R>
+{
+ private final ProcessorManager<T, ?> delegate;
+ private final BiFunction<R, T, R> accumulateFn;
+ private R currentResult;
+
+ public AccumulatingProcessorManager(
+ ProcessorManager<T, ?> delegate,
+ R initialResult,
+ BiFunction<R, T, R> accumulateFn
+ )
+ {
+ this.delegate = delegate;
+ this.currentResult = Preconditions.checkNotNull(initialResult,
"initialResult");
+ this.accumulateFn = accumulateFn;
+ }
+
+ @Override
+ public ListenableFuture<Optional<ProcessorAndCallback<T>>> next()
+ {
+ return FutureUtils.transform(
Review Comment:
the next should have a currrentResult!=null check since the caller can call
a close after the initialization.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ChainedProcessorManager.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.manager.ProcessorAndCallback;
+import org.apache.druid.frame.processor.manager.ProcessorManager;
+import org.apache.druid.utils.CloseableUtils;
+
+import javax.annotation.Nullable;
+import java.util.Optional;
+import java.util.function.Function;
+
+/**
+ * Manager that chains processors: runs {@link #first} first, then based on
its result, creates {@link #restFuture}
+ * using {@link #restFactory} and runs that next.
+ */
+public class ChainedProcessorManager<A, B, R> implements
ProcessorManager<Object, R>
+{
+ /**
+ * First processor. This one blocks all the others. The reference is set to
null once the processor has been
+ * returned by the channel.
+ */
+ @Nullable
+ private FrameProcessor<A> first;
+
+ /**
+ * Produces {@link #restFuture}.
+ */
+ private final Function<A, ProcessorManager<B, R>> restFactory;
+
+ /**
+ * The rest of the processors. Produced by {@link #restFactory} once {@link
#first} has completed.
+ */
+ private final SettableFuture<ProcessorManager<B, R>> restFuture =
SettableFuture.create();
+
+ public ChainedProcessorManager(
+ final FrameProcessor<A> first,
+ final Function<A, ProcessorManager<B, R>> restFactory
+ )
+ {
+ this.first = Preconditions.checkNotNull(first, "first");
+ this.restFactory = Preconditions.checkNotNull(restFactory, "restFactory");
+ }
+
+ @Override
+ public ListenableFuture<Optional<ProcessorAndCallback<Object>>> next()
+ {
+ if (first != null) {
+ //noinspection unchecked
+ final FrameProcessor<Object> tmp = (FrameProcessor<Object>) first;
+ first = null;
+ return Futures.immediateFuture(Optional.of(new
ProcessorAndCallback<>(tmp, this::onFirstProcessorComplete)));
+ } else {
+ return FutureUtils.transformAsync(
+ restFuture,
+ rest -> (ListenableFuture) rest.next()
+ );
+ }
+ }
+
+ private void onFirstProcessorComplete(final Object firstResult)
+ {
+ //noinspection unchecked
+ restFuture.set(restFactory.apply((A) firstResult));
+ }
+
+ @Override
+ public R result()
+ {
+ return FutureUtils.getUncheckedImmediately(restFuture).result();
+ }
+
+ @Override
+ public void close()
+ {
Review Comment:
I guess we are missing the close case here as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]