How to I unsubscribe to the GitBox emails? I've tried to unsubscribe to the apache list by sending an email to dev-unsubscr...@iceberg.apache.org
It's been more than 24h, so I'm not sure if I need to do something else. Thanks Greg On Wed, 12 Dec 2018 at 07:21, GitBox <g...@apache.org> wrote: > mccheah commented on a change in pull request #45: Lazily submit tasks in > ParallelIterable and add cancellation. > URL: > https://github.com/apache/incubator-iceberg/pull/45#discussion_r240776853 > > > > ########## > File path: > core/src/main/java/com/netflix/iceberg/util/ParallelIterable.java > ########## > @@ -19,73 +19,124 @@ > > package com.netflix.iceberg.util; > > +import com.google.common.base.Preconditions; > +import com.google.common.collect.Iterables; > +import com.netflix.iceberg.io.CloseableGroup; > +import java.io.Closeable; > import java.util.Iterator; > import java.util.NoSuchElementException; > import java.util.concurrent.ConcurrentLinkedQueue; > -import java.util.concurrent.ExecutionException; > import java.util.concurrent.ExecutorService; > import java.util.concurrent.Future; > -import java.util.concurrent.TimeUnit; > -import java.util.concurrent.TimeoutException; > > -public class ParallelIterable<T> implements Iterable<T> { > +public class ParallelIterable<T> extends CloseableGroup implements > Iterable<T> { > private final Iterable<Iterable<T>> iterables; > - private final ExecutorService trackingPool; > private final ExecutorService workerPool; > > public ParallelIterable(Iterable<Iterable<T>> iterables, > - ExecutorService trackingPool, > ExecutorService workerPool) { > this.iterables = iterables; > - this.trackingPool = trackingPool; > this.workerPool = workerPool; > } > > @Override > public Iterator<T> iterator() { > - return new ParallelIterator<>(iterables, trackingPool, workerPool); > + ParallelIterator<T> iter = new ParallelIterator<>(iterables, > workerPool); > + addCloseable(iter); > + return iter; > } > > - private static class ParallelIterator<T> implements Iterator<T> { > + private static class ParallelIterator<T> implements Iterator<T>, > Closeable { > > Review comment: > I'm wondering if there's a more idiomatic way to do this, particularly > one that doesn't require both: > 1. Busy waiting. Generally it's a flag in concurrent programming if > busy waiting is used over alternative primitives like locks, conditions, > queues, monitors, etc. > 2. Manual tracking of tasks by index. > > I came up with the following alternative. Apologize that it has to be > in pseudo-code form; due to the nature of the problem it's pretty hard to > explain without the code. Let's see how this works out: > > ``` > class ParallelIterator<T> implements Iterator<T>, Closeable { > > private LinkedList<T> availableValues; > private LinkedList<Future<List<T>>> runningTasks; > private ExecutorService threadPool; > private Iterator<Iterable<T>> pendingValues; > > // Constructor etc. > > boolean hasNext() { > return !runningTasks.isEmpty() || !availableValues.isEmpty() || > !pendingValues.isEmpty(); > } > > T next() { > if (!availableValues.isEmpty()) { > return availableValues.poll(); > } > if (!runningTasks.isEmpty()) { > availableValues.addAll(runningTasks.poll().get()); > return next(); // Or availableValues.poll() if we don't like > recursion > } > if (pendingValues.hasNext()) { > // Buffer / eagerly submit some set of tasks, i.e. lookahead. > for (int i = 0; i < TASK_COUNT && pendingValues.hasNext(); i++) { > Iterable<T> nextPendingValues = pendingValues.next(); > Future<List<T>> nextRunningTask = threadPool.submit(() -> > ImmutableList.copyOf(nextPendingValues)); > runningTasks.add(nextRunningTask); > } > return next(); // Recursive call that checks will now check based > on running tasks > } > throw error; // No values remaining > } > } > > ``` > > The general idea is to keep a running iterator over the backing > iterable. When calling `next()`, submit tasks that are buffered into a > worker queue of futures; each future represents computing the next group of > values. Then on `next()`: > - Get an available value from a completed task, if possible > - Else check the work queue and see if a new batch of values is ready > - Otherwise submit more work and wait > > What do you think about this approach? The advantages are: > - No busy waiting > - No need to maintain indices manually. Everything is done via > collection primitives (`poll`, `iterator`, etc.) > > There's a few ways this framework can be adjusted. For example on > `next`, if we determine that there is only some minimum number of running > tasks remaining, we can choose to eagerly submit work ahead of the user > actually requesting for those values - thereby we pipeline the main > thread's work on the values with the worker thread's work that produces the > values. > > ---------------------------------------------------------------- > This is an automated message from the Apache Git Service. > To respond to the message, please log on GitHub and use the > URL above to go to the specific comment. > > For queries about this service, please contact Infrastructure at: > us...@infra.apache.org > > > With regards, > Apache Git Services >