Switch Infinispan to SYNC mode and add cache future handling.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/1568fc63 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/1568fc63 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/1568fc63 Branch: refs/heads/master Commit: 1568fc63e4fa2e6c12cc38012334357fc0439da2 Parents: 6b9a1c5 Author: Steven Phillips <[email protected]> Authored: Mon Jun 9 02:35:09 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Mon Jun 16 13:50:45 2014 -0700 ---------------------------------------------------------------------- .../apache/drill/exec/cache/DistributedMap.java | 9 +-- .../drill/exec/cache/DistributedMultiMap.java | 4 +- .../drill/exec/cache/infinispan/ICache.java | 62 ++++++++++++++++---- .../drill/exec/cache/local/LocalCache.java | 53 ++++++++++++++--- .../apache/drill/exec/work/foreman/Foreman.java | 24 ++++++-- 5 files changed, 123 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1568fc63/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java index 2411434..7d3ca9c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java @@ -18,15 +18,16 @@ package org.apache.drill.exec.cache; import java.util.Map; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; public interface DistributedMap<K, V>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedMap.class); public V get(K key); - public void put(K key, V value); - public void delete(K key); - public void putIfAbsent(K key, V value); - public void putIfAbsent(K key, V value, long ttl, TimeUnit timeUnit); + public Future<V> put(K key, V value); + public Future<V> delete(K key); + public Future<V> putIfAbsent(K key, V value); + public Future<V> putIfAbsent(K key, V value, long ttl, TimeUnit timeUnit); public Iterable<Map.Entry<K, V>> getLocalEntries(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1568fc63/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java index bf06646..214a871 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java @@ -18,9 +18,11 @@ package org.apache.drill.exec.cache; import java.util.Collection; +import java.util.List; +import java.util.concurrent.Future; public interface DistributedMultiMap<K, V> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedMultiMap.class); public Collection<V> get(K key); - public void put(K key, V value); + public Future<Boolean> put(K key, V value); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1568fc63/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java index 651fc04..d3b63db 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java @@ -25,6 +25,7 @@ import java.util.Map.Entry; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import com.google.common.collect.Maps; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.cache.Counter; @@ -54,7 +55,6 @@ import org.jgroups.protocols.COUNTER; import org.jgroups.protocols.FRAG2; import org.jgroups.stack.ProtocolStack; -import com.google.common.collect.Maps; public class ICache implements DistributedCache{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ICache.class); @@ -69,7 +69,7 @@ public class ICache implements DistributedCache{ String clusterName = config.getString(ExecConstants.SERVICE_NAME); this.local = local; - final CacheMode mode = local ? CacheMode.LOCAL : CacheMode.DIST_ASYNC; + final CacheMode mode = local ? CacheMode.LOCAL : CacheMode.DIST_SYNC; GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder(); if(!local){ @@ -209,23 +209,23 @@ public class ICache implements DistributedCache{ } @Override - public void delete(K key) { - cache.remove(key); + public Future<V> delete(K key) { + return cache.removeAsync(key); } @Override - public void put(K key, V value) { - cache.put(key, value); + public Future<V> put(K key, V value) { + return cache.putAsync(key, value); } @Override - public void putIfAbsent(K key, V value) { - cache.putIfAbsent(key, value); + public Future<V> putIfAbsent(K key, V value) { + return cache.putIfAbsentAsync(key, value); } @Override - public void putIfAbsent(K key, V value, long ttl, TimeUnit timeUnit) { - cache.putIfAbsent(key, value, ttl, timeUnit); + public Future<V> putIfAbsent(K key, V value, long ttl, TimeUnit timeUnit) { + return cache.putIfAbsentAsync(key, value, ttl, timeUnit); } } @@ -247,16 +247,52 @@ public class ICache implements DistributedCache{ } @Override - public void put(K key, V value) { - cache.put(key, new DeltaList<V>(value)); + public Future<Boolean> put(K key, V value) { + return new ICacheFuture(cache.putAsync(key, new DeltaList(value))); + } + + } + + public static class ICacheFuture implements Future<Boolean> { + + Future future; + + public ICacheFuture(Future future) { + this.future = future; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return future.cancel(mayInterruptIfRunning); } + @Override + public boolean isCancelled() { + return future.isCancelled(); + } + + @Override + public boolean isDone() { + return future.isDone(); + } + + @Override + public Boolean get() throws InterruptedException, ExecutionException { + future.get(); + return true; + } + + @Override + public Boolean get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + future.get(timeout, unit); + return true; + } } - private static class DeltaList<V> extends LinkedList<V> implements DeltaAware, Delta{ + private static class DeltaList<V> extends LinkedList<V> implements DeltaAware, Delta, List<V> { /** The serialVersionUID */ private static final long serialVersionUID = 2176345973026460708L; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1568fc63/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java index e61cd76..31ab909 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java @@ -29,8 +29,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import org.apache.drill.common.config.DrillConfig; @@ -235,8 +234,43 @@ public class LocalCache implements DistributedCache { } @Override - public void put(K key, V value) { + public Future<Boolean> put(K key, V value) { mmap.put(serialize(key, config.getMode()), serialize(value, config.getMode())); + return new LocalCacheFuture(true); + } + } + + public static class LocalCacheFuture<V> implements Future<V> { + + V value; + + public LocalCacheFuture(V value) { + this.value = value; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return true; + } + + @Override + public V get() throws InterruptedException, ExecutionException { + return value; + } + + @Override + public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return value; } } @@ -268,25 +302,30 @@ public class LocalCache implements DistributedCache { } @Override - public void put(K key, V value) { + public Future<V> put(K key, V value) { m.put(serialize(key, config.getMode()), serialize(value, config.getMode())); + return new LocalCacheFuture(value); } @Override - public void putIfAbsent(K key, V value) { + public Future<V> putIfAbsent(K key, V value) { m.putIfAbsent(serialize(key, config.getMode()), serialize(value, config.getMode())); + return new LocalCacheFuture(value); } @Override - public void delete(K key) { + public Future<V> delete(K key) { + V value = get(key); m.remove(serialize(key, config.getMode())); + return new LocalCacheFuture(value); } @Override - public void putIfAbsent(K key, V value, long ttl, TimeUnit timeUnit) { + public Future<V> putIfAbsent(K key, V value, long ttl, TimeUnit timeUnit) { m.putIfAbsent(serialize(key, config.getMode()), serialize(value, config.getMode())); logger.warn("Expiration not implemented in local map cache"); + return new LocalCacheFuture<V>(value); } private class DeserializingTransformer implements Iterator<Map.Entry<K, V>> { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1568fc63/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index 3048f77..1629f8c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -21,13 +21,20 @@ import io.netty.buffer.ByteBuf; import java.io.Closeable; import java.io.IOException; +import java.util.LinkedList; import java.util.List; +import java.util.Queue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.logical.LogicalPlan; import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode; import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.cache.CachedVectorContainer; import org.apache.drill.exec.cache.DistributedCache.CacheConfig; import org.apache.drill.exec.cache.DistributedCache.SerializationMode; import org.apache.drill.exec.coord.DistributedSemaphore; @@ -347,10 +354,11 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ // store fragments in distributed grid. logger.debug("Storing fragments"); + List<Future<PlanFragment>> queue = new LinkedList<>(); for (PlanFragment f : work.getFragments()) { // store all fragments in grid since they are part of handshake. - context.getCache().getMap(FRAGMENT_CACHE).put(f.getHandle(), f); + queue.add(context.getCache().getMap(FRAGMENT_CACHE).put(f.getHandle(), f)); if (f.getLeafFragment()) { leafFragments.add(f); } else { @@ -358,9 +366,14 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ } } - int totalFragments = 1 + intermediateFragments.size() + leafFragments.size(); - fragmentManager.getStatus().setTotalFragments(totalFragments); - fragmentManager.getStatus().updateCache(); + for (Future<PlanFragment> f : queue) { + try { + f.get(10, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new ExecutionSetupException("failure while storing plan fragments", e); + } + } + logger.debug("Fragments stored."); logger.debug("Submitting fragments to run."); @@ -368,6 +381,9 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ logger.debug("Fragments running."); state.updateState(QueryState.PENDING, QueryState.RUNNING); + int totalFragments = 1 + intermediateFragments.size() + leafFragments.size(); + fragmentManager.getStatus().setTotalFragments(totalFragments); + fragmentManager.getStatus().updateCache(); } catch (Exception e) { fail("Failure while setting up query.", e);
