How to I unsubscribe to the GitBox emails? I've tried to unsubscribe to the
apache list by sending an email to dev-unsubscr...@iceberg.apache.org

It's been more than 24h, so I'm not sure if I need to do something else.

Thanks
Greg


On Wed, 12 Dec 2018 at 07:21, GitBox <g...@apache.org> wrote:

> mccheah commented on a change in pull request #45: Lazily submit tasks in
> ParallelIterable and add cancellation.
> URL:
> https://github.com/apache/incubator-iceberg/pull/45#discussion_r240776853
>
>
>
>  ##########
>  File path:
> core/src/main/java/com/netflix/iceberg/util/ParallelIterable.java
>  ##########
>  @@ -19,73 +19,124 @@
>
>  package com.netflix.iceberg.util;
>
> +import com.google.common.base.Preconditions;
> +import com.google.common.collect.Iterables;
> +import com.netflix.iceberg.io.CloseableGroup;
> +import java.io.Closeable;
>  import java.util.Iterator;
>  import java.util.NoSuchElementException;
>  import java.util.concurrent.ConcurrentLinkedQueue;
> -import java.util.concurrent.ExecutionException;
>  import java.util.concurrent.ExecutorService;
>  import java.util.concurrent.Future;
> -import java.util.concurrent.TimeUnit;
> -import java.util.concurrent.TimeoutException;
>
> -public class ParallelIterable<T> implements Iterable<T> {
> +public class ParallelIterable<T> extends CloseableGroup implements
> Iterable<T> {
>    private final Iterable<Iterable<T>> iterables;
> -  private final ExecutorService trackingPool;
>    private final ExecutorService workerPool;
>
>    public ParallelIterable(Iterable<Iterable<T>> iterables,
> -                          ExecutorService trackingPool,
>                            ExecutorService workerPool) {
>      this.iterables = iterables;
> -    this.trackingPool = trackingPool;
>      this.workerPool = workerPool;
>    }
>
>    @Override
>    public Iterator<T> iterator() {
> -    return new ParallelIterator<>(iterables, trackingPool, workerPool);
> +    ParallelIterator<T> iter = new ParallelIterator<>(iterables,
> workerPool);
> +    addCloseable(iter);
> +    return iter;
>    }
>
> -  private static class ParallelIterator<T> implements Iterator<T> {
> +  private static class ParallelIterator<T> implements Iterator<T>,
> Closeable {
>
>  Review comment:
>    I'm wondering if there's a more idiomatic way to do this, particularly
> one that doesn't require both:
>    1. Busy waiting. Generally it's a flag in concurrent programming if
> busy waiting is used over alternative primitives like locks, conditions,
> queues, monitors, etc.
>    2. Manual tracking of tasks by index.
>
>    I came up with the following alternative. Apologize that it has to be
> in pseudo-code form; due to the nature of the problem it's pretty hard to
> explain without the code. Let's see how this works out:
>
>    ```
>    class ParallelIterator<T> implements Iterator<T>, Closeable {
>
>      private LinkedList<T> availableValues;
>      private LinkedList<Future<List<T>>> runningTasks;
>      private ExecutorService threadPool;
>      private Iterator<Iterable<T>> pendingValues;
>
>      // Constructor etc.
>
>      boolean hasNext() {
>        return !runningTasks.isEmpty() || !availableValues.isEmpty() ||
> !pendingValues.isEmpty();
>      }
>
>      T next() {
>        if (!availableValues.isEmpty()) {
>          return availableValues.poll();
>        }
>        if (!runningTasks.isEmpty()) {
>          availableValues.addAll(runningTasks.poll().get());
>          return next(); // Or availableValues.poll() if we don't like
> recursion
>        }
>        if (pendingValues.hasNext()) {
>          // Buffer / eagerly submit some set of tasks, i.e. lookahead.
>          for (int i = 0; i < TASK_COUNT && pendingValues.hasNext(); i++) {
>            Iterable<T> nextPendingValues = pendingValues.next();
>            Future<List<T>> nextRunningTask = threadPool.submit(() ->
> ImmutableList.copyOf(nextPendingValues));
>            runningTasks.add(nextRunningTask);
>          }
>          return next(); // Recursive call that checks will now check based
> on running tasks
>        }
>        throw error; // No values remaining
>      }
>    }
>
>    ```
>
>    The general idea is to keep a running iterator over the backing
> iterable. When calling `next()`, submit tasks that are buffered into a
> worker queue of futures; each future represents computing the next group of
> values. Then on `next()`:
>    - Get an available value from a completed task, if possible
>    - Else check the work queue and see if a new batch of values is ready
>    - Otherwise submit more work and wait
>
>    What do you think about this approach? The advantages are:
>    - No busy waiting
>    - No need to maintain indices manually. Everything is done via
> collection primitives (`poll`, `iterator`, etc.)
>
>    There's a few ways this framework can be adjusted. For example on
> `next`, if we determine that there is only some minimum number of running
> tasks remaining, we can choose to eagerly submit work ahead of the user
> actually requesting for those values - thereby we pipeline the main
> thread's work on the values with the worker thread's work that produces the
> values.
>
> ----------------------------------------------------------------
> This is an automated message from the Apache Git Service.
> To respond to the message, please log on GitHub and use the
> URL above to go to the specific comment.
>
> For queries about this service, please contact Infrastructure at:
> us...@infra.apache.org
>
>
> With regards,
> Apache Git Services
>

Reply via email to