leventov commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r207754740
 
 

 ##########
 File path: 
java-util/src/main/java/io/druid/java/util/common/guava/MergeWorkTask.java
 ##########
 @@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.java.util.common.guava;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.druid.java.util.common.Pair;
+
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+public class MergeWorkTask<T> extends ForkJoinTask<Sequence<T>>
+{
+
+  /**
+   * Take a stream of sequences, split them as possible, and do intermediate 
merges. If the input stream is not
+   * a parallel stream, do a traditional merge. The stream attempts to use 
groups of {@code batchSize} to do its work, but this
+   * goal is on a best effort basis. Input streams that cannot be split or are 
not sized or not subsized might not be
+   * elligable for this parallelization. The intermediate merges are done in 
the passed in ForkJoinPool, but the final
+   * merge is still done when the returned sequence accumulated. The 
intermediate merges are yielded in the order
+   * in which they are ready.
+   *
+   * Exceptions that happen during execution of the merge are passed through 
and bubbled up during the resulting sequence
+   * iteration
+   *
+   * @param mergerFn      The function that will merge a stream of sequences 
into a single sequence. If the baseSequences stream is parallel, this work will 
be done in the FJP, otherwise it will be called directly.
+   * @param baseSequences The sequences that need merged
+   * @param batchSize     The input stream should be split down to this number 
if possible. This sets the target number of segments per merge thread work
+   * @param fjp           The ForkJoinPool to do the intermediate merges in.
+   * @param <T>           The result type
+   *
+   * @return A Sequence that will be the merged results of the sub-sequences
+   *
+   * @throws RuntimeException Will throw a RuntimeException in during 
iterating through the returned Sequence if a Throwable
+   *                          was encountered in an intermediate merge
+   */
+  public static <T> Sequence<T> parallelMerge(
+      Stream<? extends Sequence<? extends T>> baseSequences,
+      Function<Stream<? extends Sequence<? extends T>>, Sequence<T>> mergerFn,
 
 Review comment:
   It seems to me that it should be `? super Sequence<? extends T>`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to