LakshSingla commented on code in PR #15007: URL: https://github.com/apache/druid/pull/15007#discussion_r1341308769
########## processing/src/main/java/org/apache/druid/frame/processor/manager/ProcessorManager.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor.manager; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.frame.processor.FrameProcessorExecutor; +import org.apache.druid.java.util.common.guava.Sequence; + +import java.io.Closeable; +import java.util.Optional; +import java.util.function.BiFunction; + +/** + * Used by {@link FrameProcessorExecutor#runAllFully} to manage the launching of processors. Processors returned by + * this class may run concurrently with each other. + * + * This interface allows for simple sequences of processors, such as {@link ProcessorManagers#of(Sequence)}. It also + * allows for situations where later processors depend on the results of earlier processors. (The result of earlier + * processors are made available to the manager through {@link ProcessorAndCallback#onComplete(Object)}.) + * + * Implementations do not need to be thread-safe. + */ +public interface ProcessorManager<T, R> extends Closeable +{ + /** + * Returns the next processor that should be run, along with a callback. The callback is called when the processor + * completes successfully, along with the result of the processor. If the processor fails, the callback is not called. + * + * The callback is called in a thread-safe manner: it will never be called concurrently with another callback, or + * concurrently with a call to "next" or {@link #close()}. + * + * Returns an empty Optional if there are no more processors to run. + * + * @throws java.util.NoSuchElementException if a prior call to this method had returned an empty Optional + */ + ListenableFuture<Optional<ProcessorAndCallback<T>>> next(); + + /** + * Called after all procesors are done, prior to {@link #close()}, to retrieve the result of this computation. + */ + R result(); + + /** + * Called when all processors are done, or when one has failed. + * + * This method releases all resources associated with this manager. After this method is called, no other methods + * are called. Review Comment: nit: inline with the rfc key word ```suggestion * must be called. ``` ########## processing/src/main/java/org/apache/druid/frame/processor/manager/ProcessorManagers.java: ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor.manager; + +import com.google.common.collect.Iterators; +import org.apache.druid.frame.processor.FrameProcessor; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; + +import java.util.function.Supplier; + +/** + * Utility functions for creating {@link ProcessorManager}. + */ +public class ProcessorManagers +{ + private ProcessorManagers() + { + // No instantiation. + } + + /** + * Manager with zero processors. Returns the number of processors run. + */ + public static <T> ProcessorManager<T, Long> none() + { + return new SequenceProcessorManager<>(Sequences.empty()); + } + + /** + * Manager with processors derived from a {@link Sequence}. Returns the number of processors run. + */ + public static <T> ProcessorManager<T, Long> of(final Sequence<? extends FrameProcessor<T>> processors) + { + return new SequenceProcessorManager<>(processors); + } + + /** + * Manager with processors derived from an {@link Iterable}. Returns the number of processors run. + */ + public static <T> ProcessorManager<T, Long> of(final Iterable<? extends FrameProcessor<T>> processors) + { + return new SequenceProcessorManager<>(Sequences.simple(processors)); + } + + /** + * Manager with a single processor derived from a {@link Supplier}. Returns the number of processors run. + */ + public static <T> ProcessorManager<T, Long> of(final Supplier<? extends FrameProcessor<T>> processors) + { + return new SequenceProcessorManager<>(Sequences.simple(() -> Iterators.singletonIterator(processors.get()))); Review Comment: ```suggestion return of(Sequences.simple(() -> Iterators.singletonIterator(processors.get()))); ``` ########## processing/src/main/java/org/apache/druid/frame/processor/manager/ProcessorManager.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor.manager; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.frame.processor.FrameProcessorExecutor; +import org.apache.druid.java.util.common.guava.Sequence; + +import java.io.Closeable; +import java.util.Optional; +import java.util.function.BiFunction; + +/** + * Used by {@link FrameProcessorExecutor#runAllFully} to manage the launching of processors. Processors returned by + * this class may run concurrently with each other. + * + * This interface allows for simple sequences of processors, such as {@link ProcessorManagers#of(Sequence)}. It also + * allows for situations where later processors depend on the results of earlier processors. (The result of earlier + * processors are made available to the manager through {@link ProcessorAndCallback#onComplete(Object)}.) Review Comment: From the above comment, it seems that the processors can be run concurrently, however here it seems like there is a dependence on ordering. How does that synchronize up, would the underlying processors block till they can run does the external caller have to do this synchronization, or would `next()` block till we can run the other frame processor? ########## processing/src/main/java/org/apache/druid/frame/processor/manager/ProcessorManager.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor.manager; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.frame.processor.FrameProcessorExecutor; +import org.apache.druid.java.util.common.guava.Sequence; + +import java.io.Closeable; +import java.util.Optional; +import java.util.function.BiFunction; + +/** + * Used by {@link FrameProcessorExecutor#runAllFully} to manage the launching of processors. Processors returned by + * this class may run concurrently with each other. + * + * This interface allows for simple sequences of processors, such as {@link ProcessorManagers#of(Sequence)}. It also + * allows for situations where later processors depend on the results of earlier processors. (The result of earlier + * processors are made available to the manager through {@link ProcessorAndCallback#onComplete(Object)}.) + * + * Implementations do not need to be thread-safe. + */ +public interface ProcessorManager<T, R> extends Closeable +{ + /** + * Returns the next processor that should be run, along with a callback. The callback is called when the processor + * completes successfully, along with the result of the processor. If the processor fails, the callback is not called. + * + * The callback is called in a thread-safe manner: it will never be called concurrently with another callback, or + * concurrently with a call to "next" or {@link #close()}. + * + * Returns an empty Optional if there are no more processors to run. + * + * @throws java.util.NoSuchElementException if a prior call to this method had returned an empty Optional + */ + ListenableFuture<Optional<ProcessorAndCallback<T>>> next(); + + /** + * Called after all procesors are done, prior to {@link #close()}, to retrieve the result of this computation. Review Comment: What happens if this method is called before? Is the result undefined or can the method throw an exception? ########## processing/src/main/java/org/apache/druid/frame/processor/manager/ProcessorManagers.java: ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor.manager; + +import com.google.common.collect.Iterators; +import org.apache.druid.frame.processor.FrameProcessor; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; + +import java.util.function.Supplier; + +/** + * Utility functions for creating {@link ProcessorManager}. + */ +public class ProcessorManagers +{ + private ProcessorManagers() + { + // No instantiation. + } + + /** + * Manager with zero processors. Returns the number of processors run. + */ + public static <T> ProcessorManager<T, Long> none() + { + return new SequenceProcessorManager<>(Sequences.empty()); Review Comment: nit, style ```suggestion return of(Sequences.empty()); ``` ########## processing/src/main/java/org/apache/druid/frame/processor/manager/ProcessorManagers.java: ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor.manager; + +import com.google.common.collect.Iterators; +import org.apache.druid.frame.processor.FrameProcessor; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; + +import java.util.function.Supplier; + +/** + * Utility functions for creating {@link ProcessorManager}. + */ +public class ProcessorManagers +{ + private ProcessorManagers() + { + // No instantiation. + } + + /** + * Manager with zero processors. Returns the number of processors run. + */ + public static <T> ProcessorManager<T, Long> none() + { + return new SequenceProcessorManager<>(Sequences.empty()); + } + + /** + * Manager with processors derived from a {@link Sequence}. Returns the number of processors run. + */ + public static <T> ProcessorManager<T, Long> of(final Sequence<? extends FrameProcessor<T>> processors) + { + return new SequenceProcessorManager<>(processors); + } + + /** + * Manager with processors derived from an {@link Iterable}. Returns the number of processors run. + */ + public static <T> ProcessorManager<T, Long> of(final Iterable<? extends FrameProcessor<T>> processors) + { + return new SequenceProcessorManager<>(Sequences.simple(processors)); Review Comment: style ```suggestion return of(Sequences.simple(processors)); ``` ########## processing/src/main/java/org/apache/druid/frame/processor/manager/SequenceProcessorManager.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor.manager; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.frame.processor.FrameProcessor; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Yielder; +import org.apache.druid.java.util.common.guava.Yielders; +import org.apache.druid.utils.CloseableUtils; + +import java.util.NoSuchElementException; +import java.util.Optional; + +/** + * Processor manager based on a {@link Sequence}. Returns the number of processors run. + */ +public class SequenceProcessorManager<T, P extends FrameProcessor<T>> implements ProcessorManager<T, Long> +{ + private final Sequence<P> sequence; + private Yielder<P> yielder; + private boolean done; + private long numProcessors; + + SequenceProcessorManager(final Sequence<P> sequence) + { + this.sequence = sequence; + } + + @Override + public ListenableFuture<Optional<ProcessorAndCallback<T>>> next() + { + initializeYielderIfNeeded(); + + if (done) { + throw new NoSuchElementException(); + } else if (yielder.isDone()) { + done = true; + return Futures.immediateFuture(Optional.empty()); + } else { + final P retVal; + try { + retVal = Preconditions.checkNotNull(yielder.get(), "processor"); + yielder = yielder.next(null); + } + catch (Throwable e) { + // Some problem with yielder.get() or yielder.next(null). Close the yielder and mark us as done. + done = true; + CloseableUtils.closeAndSuppressExceptions(yielder, e::addSuppressed); + yielder = null; Review Comment: Can refactor it like: ```suggestion close(); ``` ########## processing/src/main/java/org/apache/druid/frame/processor/manager/SequenceProcessorManager.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor.manager; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.frame.processor.FrameProcessor; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Yielder; +import org.apache.druid.java.util.common.guava.Yielders; +import org.apache.druid.utils.CloseableUtils; + +import java.util.NoSuchElementException; +import java.util.Optional; + +/** + * Processor manager based on a {@link Sequence}. Returns the number of processors run. + */ +public class SequenceProcessorManager<T, P extends FrameProcessor<T>> implements ProcessorManager<T, Long> +{ + private final Sequence<P> sequence; + private Yielder<P> yielder; + private boolean done; + private long numProcessors; + + SequenceProcessorManager(final Sequence<P> sequence) + { + this.sequence = sequence; + } + + @Override + public ListenableFuture<Optional<ProcessorAndCallback<T>>> next() + { + initializeYielderIfNeeded(); + + if (done) { + throw new NoSuchElementException(); + } else if (yielder.isDone()) { + done = true; + return Futures.immediateFuture(Optional.empty()); + } else { + final P retVal; + try { + retVal = Preconditions.checkNotNull(yielder.get(), "processor"); + yielder = yielder.next(null); + } + catch (Throwable e) { + // Some problem with yielder.get() or yielder.next(null). Close the yielder and mark us as done. + done = true; + CloseableUtils.closeAndSuppressExceptions(yielder, e::addSuppressed); + yielder = null; + throw e; + } + + return Futures.immediateFuture(Optional.of(new ProcessorAndCallback<>(retVal, r -> numProcessors++))); + } + } + + @Override + public Long result() Review Comment: Relates to the comment added on the interface of this class, should this method throw if called before `done()` or should it return the value. In case it returns, can the callers make some sense of the value (like here it returns the number of processors _run so far_), or would the value be meaningless. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
