DRILL-1436: Remove use of UDP based cache for purposes of intermediate 
PlanFragment distribution

Includes:
- Remove dependency on Infinispan
- Update initialize fragments to send in batches.
- Update RPC layer to capture UserRpcExceptions and propagate back.
- Send full stack trace in DrillPBError and let foreman node decide on 
formatting.
- Increment control rpc version
- Update systables to report current drillbit and version


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/451dd608
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/451dd608
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/451dd608

Branch: refs/heads/master
Commit: 451dd608a62c08dde26a2da2f085ab11a218ee72
Parents: 6dca24a
Author: Jacques Nadeau <jacq...@apache.org>
Authored: Sun Oct 19 10:46:25 2014 -0700
Committer: Jacques Nadeau <jacq...@apache.org>
Committed: Sun Oct 26 21:54:38 2014 -0700

----------------------------------------------------------------------
 exec/java-exec/pom.xml                          |   15 -
 .../drill/exec/cache/infinispan/ICache.java     |  339 --
 .../infinispan/JacksonAdvancedExternalizer.java |   71 -
 .../ProtobufAdvancedExternalizer.java           |   66 -
 .../infinispan/VAAdvancedExternalizer.java      |   71 -
 .../cache/infinispan/ZookeeperCacheStore.java   |   66 -
 .../drill/exec/cache/local/LocalCache.java      |  407 ---
 .../drill/exec/compile/ClassTransformer.java    |    5 +-
 .../apache/drill/exec/compile/CodeCompiler.java |   10 +-
 .../apache/drill/exec/ops/FragmentContext.java  |    4 +-
 .../apache/drill/exec/ops/FragmentStats.java    |    9 +-
 .../org/apache/drill/exec/ops/QueryContext.java |    6 -
 .../drill/exec/physical/impl/ScreenCreator.java |    4 +-
 .../BroadcastSenderRootExec.java                |    4 +-
 .../OrderedPartitionRecordBatch.java            |    2 +-
 .../impl/partitionsender/StatusHandler.java     |    5 +-
 .../drill/exec/rpc/BaseRpcOutcomeListener.java  |    2 +
 .../drill/exec/rpc/CoordinationQueue.java       |    4 +-
 .../drill/exec/rpc/OutboundRpcMessage.java      |    9 +-
 .../drill/exec/rpc/RemoteRpcException.java      |   17 +-
 .../java/org/apache/drill/exec/rpc/RpcBus.java  |   51 +-
 .../org/apache/drill/exec/rpc/RpcEncoder.java   |    1 +
 .../org/apache/drill/exec/rpc/RpcException.java |    8 +
 .../apache/drill/exec/rpc/UserRpcException.java |   49 +
 .../exec/rpc/control/ControlRpcConfig.java      |    6 +-
 .../drill/exec/rpc/control/ControlTunnel.java   |   15 +-
 .../drill/exec/rpc/control/WorkEventBus.java    |  102 +-
 .../apache/drill/exec/rpc/data/DataServer.java  |    2 +-
 .../org/apache/drill/exec/server/Drillbit.java  |   13 +-
 .../drill/exec/server/DrillbitContext.java      |   11 +-
 .../drill/exec/server/RemoteServiceSet.java     |   23 +-
 .../server/options/SystemOptionManager.java     |    7 -
 .../drill/exec/store/sys/DrillbitIterator.java  |    4 +
 .../drill/exec/store/sys/SystemTable.java       |    3 +-
 .../drill/exec/store/sys/SystemTablePlugin.java |    2 +
 .../drill/exec/store/sys/VersionIterator.java   |   78 +
 .../org/apache/drill/exec/work/ErrorHelper.java |  215 +-
 .../org/apache/drill/exec/work/WorkManager.java |    5 +-
 .../exec/work/batch/ControlHandlerImpl.java     |   50 +-
 .../exec/work/batch/ControlMessageHandler.java  |    5 +-
 .../apache/drill/exec/work/foreman/Foreman.java |   50 +-
 .../drill/exec/work/foreman/FragmentData.java   |    6 +-
 .../drill/exec/work/foreman/QueryManager.java   |   49 +-
 .../work/fragment/AbstractStatusReporter.java   |    3 +-
 .../java/org/apache/drill/PlanningBase.java     |    8 -
 .../java/org/apache/drill/TestBugFixes.java     |    9 +
 .../exec/cache/TestCacheSerialization.java      |  150 -
 .../exec/compile/TestClassTransformation.java   |    3 +-
 .../exec/physical/impl/TestOptiqPlans.java      |   17 +-
 .../TestOrderedPartitionExchange.java           |    2 +
 .../apache/drill/exec/server/TestBitRpc.java    |    2 +-
 .../vector/complex/writer/TestJsonReader.java   |    4 +-
 .../java/org/apache/drill/jdbc/DrillCursor.java |    2 +-
 .../org/apache/drill/exec/proto/BitControl.java |  755 ++++-
 .../drill/exec/proto/GeneralRPCProtos.java      |  882 +-----
 .../drill/exec/proto/SchemaBitControl.java      |  113 +
 .../exec/proto/SchemaGeneralRPCProtos.java      |  132 -
 .../drill/exec/proto/SchemaUserBitShared.java   |  290 +-
 .../apache/drill/exec/proto/UserBitShared.java  | 2982 ++++++++++++++++--
 .../drill/exec/proto/beans/DrillPBError.java    |   30 +-
 .../exec/proto/beans/ExceptionWrapper.java      |  243 ++
 .../exec/proto/beans/InitializeFragments.java   |  175 +
 .../proto/beans/StackTraceElementWrapper.java   |  251 ++
 protocol/src/main/protobuf/BitControl.proto     |    6 +-
 protocol/src/main/protobuf/GeneralRPC.proto     |    8 +-
 protocol/src/main/protobuf/UserBitShared.proto  |   19 +-
 66 files changed, 5045 insertions(+), 2912 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index d05e4c6..e75f572 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -306,21 +306,6 @@
       <version>2.5.0</version>
     </dependency>
     <dependency>
-      <groupId>com.hazelcast</groupId>
-      <artifactId>hazelcast</artifactId>
-      <version>3.1.4</version>
-    </dependency>
-    <dependency>
-      <groupId>org.infinispan</groupId>
-      <artifactId>infinispan-core</artifactId>
-      <version>6.0.2.Final</version>
-    </dependency>
-    <dependency>
-      <groupId>org.infinispan</groupId>
-      <artifactId>infinispan-tree</artifactId>
-      <version>6.0.2.Final</version>
-    </dependency>
-    <dependency>
       <groupId>org.codehaus.janino</groupId>
       <artifactId>janino</artifactId>
       <version>2.7.4</version>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
deleted file mode 100644
index 6627a89..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache.infinispan;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.cache.Counter;
-import org.apache.drill.exec.cache.DistributedCache;
-import org.apache.drill.exec.cache.DistributedMap;
-import org.apache.drill.exec.cache.DistributedMultiMap;
-import org.apache.drill.exec.cache.SerializationDefinition;
-import org.apache.drill.exec.cache.local.LocalCache.LocalCounterImpl;
-import org.apache.drill.exec.exception.DrillbitStartupException;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.proto.BitControl.FragmentStatus;
-import org.apache.drill.exec.proto.BitControl.PlanFragment;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.infinispan.Cache;
-import org.infinispan.atomic.Delta;
-import org.infinispan.atomic.DeltaAware;
-import org.infinispan.configuration.cache.CacheMode;
-import org.infinispan.configuration.cache.Configuration;
-import org.infinispan.configuration.cache.ConfigurationBuilder;
-import org.infinispan.configuration.global.GlobalConfigurationBuilder;
-import org.infinispan.manager.DefaultCacheManager;
-import org.infinispan.manager.EmbeddedCacheManager;
-import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
-import org.jgroups.blocks.atomic.CounterService;
-import org.jgroups.fork.ForkChannel;
-import org.jgroups.protocols.COUNTER;
-import org.jgroups.protocols.FRAG2;
-import org.jgroups.stack.ProtocolStack;
-
-import com.google.common.collect.Maps;
-
-
-public class ICache implements DistributedCache{
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ICache.class);
-
-  private EmbeddedCacheManager manager;
-  private ForkChannel cacheChannel;
-  private final CounterService counters;
-  private final boolean local;
-  private volatile ConcurrentMap<String, Counter> localCounters;
-
-  public ICache(DrillConfig config, BufferAllocator allocator, boolean local) 
throws Exception {
-    String clusterName = config.getString(ExecConstants.SERVICE_NAME);
-    this.local = local;
-
-    final CacheMode mode = local ? CacheMode.LOCAL : CacheMode.DIST_SYNC;
-    GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
-
-    if(!local){
-      gcb.transport() //
-        .defaultTransport() //
-        .clusterName(clusterName);
-    }
-
-    gcb.serialization() //
-        .addAdvancedExternalizer(new VAAdvancedExternalizer(allocator)) //
-        .addAdvancedExternalizer(new 
JacksonAdvancedExternalizer<>(SerializationDefinition.OPTION, 
config.getMapper())) //
-        .addAdvancedExternalizer(new 
JacksonAdvancedExternalizer<>(SerializationDefinition.STORAGE_PLUGINS, 
config.getMapper())) //
-        .addAdvancedExternalizer(new 
ProtobufAdvancedExternalizer<>(SerializationDefinition.FRAGMENT_STATUS, 
FragmentStatus.PARSER)) //
-        .addAdvancedExternalizer(new 
ProtobufAdvancedExternalizer<>(SerializationDefinition.FRAGMENT_HANDLE, 
FragmentHandle.PARSER)) //
-        .addAdvancedExternalizer(new 
ProtobufAdvancedExternalizer<>(SerializationDefinition.PLAN_FRAGMENT, 
PlanFragment.PARSER)) //
-      .build();
-
-    Configuration c = new ConfigurationBuilder() //
-      .clustering() //
-
-      .cacheMode(mode) //
-      .storeAsBinary().enable() //
-      .build();
-    this.manager = new DefaultCacheManager(gcb.build(), c);
-
-    if(!local){
-      JGroupsTransport transport = (JGroupsTransport) 
manager.getCache("first").getAdvancedCache().getRpcManager().getTransport();
-      this.cacheChannel = new ForkChannel(transport.getChannel(), 
"drill-stack", "drill-hijacker", true, ProtocolStack.ABOVE, FRAG2.class, new 
COUNTER());
-      this.counters = new CounterService(this.cacheChannel);
-    }else{
-      this.cacheChannel = null;
-      this.counters = null;
-    }
-  }
-
-
-//  @Override
-//  public <K, V> Map<K, V> getSmallAtomicMap(CacheConfig<K, V> config) {
-//    Cache<String, ?> cache = manager.getCache("atomic-maps");
-//    return AtomicMapLookup.getAtomicMap(cache, config.getName());
-//  }
-
-
-  @Override
-  public void close() throws IOException {
-    manager.stop();
-  }
-
-  @Override
-  public void run() throws DrillbitStartupException {
-    try {
-      if(local){
-        localCounters = Maps.newConcurrentMap();
-        manager.start();
-      }else{
-        cacheChannel.connect("c1");
-      }
-
-    } catch (Exception e) {
-      throw new DrillbitStartupException("Failure while trying to set up 
JGroups.");
-    }
-  }
-
-  @Override
-  public <K, V> DistributedMultiMap<K, V> getMultiMap(CacheConfig<K, V> 
config) {
-    Cache<K, DeltaList<V>> cache = manager.getCache(config.getName());
-    return new IMulti<K, V>(cache, config);
-  }
-
-  @Override
-  public <K, V> DistributedMap<K, V> getMap(CacheConfig<K, V> config) {
-    Cache<K, V> c = manager.getCache(config.getName());
-    return new IMap<K, V>(c, config);
-  }
-
-  @Override
-  public Counter getCounter(String name) {
-    if(local){
-        Counter c = localCounters.get(name);
-        if (c == null) {
-          localCounters.putIfAbsent(name, new LocalCounterImpl());
-          return localCounters.get(name);
-        } else {
-          return c;
-        }
-
-    }else{
-      return new JGroupsCounter(counters.getOrCreateCounter(name, 0));
-    }
-
-  }
-
-  private class JGroupsCounter implements Counter{
-    final org.jgroups.blocks.atomic.Counter inner;
-
-    public JGroupsCounter(org.jgroups.blocks.atomic.Counter inner) {
-      super();
-      this.inner = inner;
-    }
-
-    @Override
-    public long get() {
-      return inner.get();
-    }
-
-    @Override
-    public long incrementAndGet() {
-      return inner.incrementAndGet();
-    }
-
-    @Override
-    public long decrementAndGet() {
-      return inner.decrementAndGet();
-    }
-
-  }
-
-  private class IMap<K, V> implements DistributedMap<K, V>{
-
-    private Cache<K, V> cache;
-    private CacheConfig<K, V> config;
-
-    public IMap(Cache<K, V> cache, CacheConfig<K, V> config) {
-      super();
-      this.cache = cache;
-      this.config = config;
-    }
-
-    @Override
-    public Iterable<Entry<K, V>> getLocalEntries() {
-      return cache.entrySet();
-    }
-
-    @Override
-    public V get(K key) {
-      return cache.get(key);
-    }
-
-    @Override
-    public Future<V> delete(K key) {
-      return cache.removeAsync(key);
-    }
-
-    @Override
-    public Future<V> put(K key, V value) {
-      return cache.putAsync(key, value);
-    }
-
-    @Override
-    public Future<V> putIfAbsent(K key, V value) {
-      return cache.putIfAbsentAsync(key, value);
-    }
-
-    @Override
-    public Future<V> putIfAbsent(K key, V value, long ttl, TimeUnit timeUnit) {
-      return cache.putIfAbsentAsync(key, value, ttl, timeUnit);
-    }
-
-  }
-
-  private class IMulti<K, V> implements DistributedMultiMap<K, V>{
-
-    private Cache<K, DeltaList<V>> cache;
-    private CacheConfig<K, V> config;
-
-    public IMulti(Cache<K, DeltaList<V>> cache, CacheConfig<K, V> config) {
-      super();
-      this.cache = cache;
-      this.config = config;
-    }
-
-    @Override
-    public Collection<V> get(K key) {
-      return cache.get(key);
-    }
-
-    @Override
-    public Future<Boolean> put(K key, V value) {
-      return new ICacheFuture(cache.putAsync(key, new DeltaList(value)));
-    }
-
-  }
-
-  public static class ICacheFuture implements Future<Boolean> {
-
-    Future future;
-
-    public ICacheFuture(Future future) {
-      this.future = future;
-    }
-
-    @Override
-    public boolean cancel(boolean mayInterruptIfRunning) {
-      return future.cancel(mayInterruptIfRunning);
-    }
-
-    @Override
-    public boolean isCancelled() {
-      return future.isCancelled();
-    }
-
-    @Override
-    public boolean isDone() {
-      return future.isDone();
-    }
-
-    @Override
-    public Boolean get() throws InterruptedException, ExecutionException {
-      future.get();
-      return true;
-    }
-
-    @Override
-    public Boolean get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException {
-      future.get(timeout, unit);
-      return true;
-    }
-  }
-
-
-
-
-  private static class DeltaList<V> extends LinkedList<V> implements 
DeltaAware, Delta, List<V> {
-
-    /** The serialVersionUID */
-    private static final long serialVersionUID = 2176345973026460708L;
-
-    public DeltaList(Collection<? extends V> c) {
-       super(c);
-    }
-
-    public DeltaList(V obj) {
-       super();
-       add(obj);
-    }
-
-    @Override
-    public Delta delta() {
-       return new DeltaList<V>(this);
-    }
-
-    @Override
-    public void commit() {
-       this.clear();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public DeltaAware merge(DeltaAware d) {
-       List<V> other = null;
-       if (d != null && d instanceof DeltaList) {
-          other = (List<V>) d;
-          for (V e : this) {
-             other.add(e);
-          }
-          return (DeltaAware) other;
-       } else {
-          return this;
-       }
-    }
- }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/JacksonAdvancedExternalizer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/JacksonAdvancedExternalizer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/JacksonAdvancedExternalizer.java
deleted file mode 100644
index 55633ab..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/JacksonAdvancedExternalizer.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache.infinispan;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Collections;
-import java.util.Set;
-
-import org.apache.drill.exec.cache.SerializationDefinition;
-import org.infinispan.commons.marshall.AdvancedExternalizer;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-public class JacksonAdvancedExternalizer<T> implements AdvancedExternalizer<T> 
 {
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(JacksonAdvancedExternalizer.class);
-
-  private final Class<?> clazz;
-  private final ObjectMapper mapper;
-  private final int id;
-
-  public JacksonAdvancedExternalizer(SerializationDefinition def, ObjectMapper 
mapper){
-    this.clazz =  def.clazz;
-    this.mapper = mapper;
-    this.id = def.id;
-  }
-
-  @Override
-  public T readObject(ObjectInput in) throws IOException, 
ClassNotFoundException {
-    byte[] bytes = new byte[in.readInt()];
-    in.readFully(bytes);
-    return (T) mapper.readValue(bytes, clazz);
-  }
-
-  @Override
-  public void writeObject(ObjectOutput out, T object) throws IOException {
-    byte[] bytes = mapper.writeValueAsBytes(object);
-    out.writeInt(bytes.length);
-    out.write(bytes);
-  }
-
-  @Override
-  public Integer getId() {
-    return id;
-  }
-
-  @Override
-  public Set<Class<? extends T>> getTypeClasses() {
-    return (Set<Class<? extends T>>) (Object) Collections.singleton(clazz);
-  }
-
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ProtobufAdvancedExternalizer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ProtobufAdvancedExternalizer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ProtobufAdvancedExternalizer.java
deleted file mode 100644
index 7b638ee..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ProtobufAdvancedExternalizer.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache.infinispan;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Collections;
-import java.util.Set;
-
-import org.apache.drill.common.util.DataInputInputStream;
-import org.apache.drill.exec.cache.SerializationDefinition;
-import org.infinispan.commons.marshall.AdvancedExternalizer;
-
-import com.google.protobuf.Message;
-import com.google.protobuf.Parser;
-
-public class ProtobufAdvancedExternalizer<T extends Message> implements 
AdvancedExternalizer<T>  {
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ProtobufAdvancedExternalizer.class);
-
-  private final Class<?> clazz;
-  private final int id;
-  private final Parser<T> parser;
-
-  public ProtobufAdvancedExternalizer(SerializationDefinition def, Parser<T> 
parser){
-    this.clazz =  def.clazz;
-    this.parser = parser;
-    this.id = def.id;
-  }
-
-  @Override
-  public T readObject(ObjectInput in) throws IOException, 
ClassNotFoundException {
-    return parser.parseFrom(DataInputInputStream.constructInputStream(in));
-  }
-
-  @Override
-  public void writeObject(ObjectOutput out, T object) throws IOException {
-    out.write(object.toByteArray());
-  }
-
-  @Override
-  public Integer getId() {
-    return id;
-  }
-
-  @Override
-  public Set<Class<? extends T>> getTypeClasses() {
-    return (Set<Class<? extends T>>) (Object) Collections.singleton(clazz);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/VAAdvancedExternalizer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/VAAdvancedExternalizer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/VAAdvancedExternalizer.java
deleted file mode 100644
index f072628..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/VAAdvancedExternalizer.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache.infinispan;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Set;
-
-import org.apache.drill.exec.cache.CachedVectorContainer;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.infinispan.commons.marshall.AdvancedExternalizer;
-
-import com.google.common.collect.ImmutableSet;
-
-public class VAAdvancedExternalizer implements 
AdvancedExternalizer<CachedVectorContainer> {
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(VAAdvancedExternalizer.class);
-
-  private BufferAllocator allocator;
-
-
-  public VAAdvancedExternalizer(BufferAllocator allocator) {
-    super();
-    this.allocator = allocator;
-  }
-
-  static final Set<Class<? extends CachedVectorContainer>> CLASSES = //
-      (Set<Class<? extends CachedVectorContainer>>) //
-      (Object) ImmutableSet.of(CachedVectorContainer.class);
-
-  @Override
-  public CachedVectorContainer readObject(ObjectInput in) throws IOException, 
ClassNotFoundException {
-    int length = in.readInt();
-    byte[] b = new byte[length];
-    in.read(b);
-    CachedVectorContainer va = new CachedVectorContainer(b, allocator);
-    return va;
-  }
-
-  @Override
-  public void writeObject(ObjectOutput out, CachedVectorContainer va) throws 
IOException {
-    out.writeInt(va.getData().length);
-    out.write(va.getData());
-  }
-
-  @Override
-  public Integer getId() {
-    // magic number for this class, assume drill uses 3001-3100.
-    return 3001;
-  }
-
-  @Override
-  public Set<Class<? extends CachedVectorContainer>> getTypeClasses() {
-    return CLASSES;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ZookeeperCacheStore.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ZookeeperCacheStore.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ZookeeperCacheStore.java
deleted file mode 100644
index 46d4eca..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ZookeeperCacheStore.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache.infinispan;
-
-import org.infinispan.marshall.core.MarshalledEntry;
-import org.infinispan.persistence.spi.ExternalStore;
-import org.infinispan.persistence.spi.InitializationContext;
-
-/**
- * Stores the cached objects in zookeeper.  Objects are stored in 
/start/cache_name/key_name = data
- * @param <K>
- * @param <V>
- */
-public class ZookeeperCacheStore<K, V> implements ExternalStore<K, V>{
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ZookeeperCacheStore.class);
-
-  private String cacheName;
-
-  @Override
-  public void init(InitializationContext ctx) {
-    ctx.getConfiguration();
-
-  }
-
-  @Override
-  public MarshalledEntry<K, V> load(K key) {
-    return null;
-  }
-
-  @Override
-  public boolean contains(K key) {
-    return false;
-  }
-
-  @Override
-  public void start() {
-  }
-
-  @Override
-  public void stop() {
-  }
-
-  @Override
-  public void write(MarshalledEntry<K, V> entry) {
-  }
-
-  @Override
-  public boolean delete(K key) {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java
deleted file mode 100644
index 99ead1c..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java
+++ /dev/null
@@ -1,407 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache.local;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.util.DataOutputOutputStream;
-import org.apache.drill.exec.cache.Counter;
-import org.apache.drill.exec.cache.DistributedCache;
-import org.apache.drill.exec.cache.DistributedMap;
-import org.apache.drill.exec.cache.DistributedMultiMap;
-import org.apache.drill.exec.cache.DrillSerializable;
-import org.apache.drill.exec.exception.DrillbitStartupException;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.memory.TopLevelAllocator;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Charsets;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimaps;
-import com.google.common.io.ByteArrayDataOutput;
-import com.google.common.io.ByteStreams;
-import com.google.protobuf.Message;
-import com.google.protobuf.Parser;
-
-public class LocalCache implements DistributedCache {
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LocalCache.class);
-
-  private volatile ConcurrentMap<CacheConfig<?, ?>, DistributedMap<?, ?>> maps;
-  private volatile ConcurrentMap<CacheConfig<?, ?>, DistributedMultiMap<?, ?>> 
multiMaps;
-  private volatile ConcurrentMap<String, Counter> counters;
-  private static final BufferAllocator allocator = new 
TopLevelAllocator(DrillConfig.create());
-
-  private static final ObjectMapper mapper = DrillConfig.create().getMapper();
-
-  @Override
-  public void close() throws IOException {
-
-  }
-
-  @Override
-  public void run() throws DrillbitStartupException {
-    maps = Maps.newConcurrentMap();
-    multiMaps = Maps.newConcurrentMap();
-    counters = Maps.newConcurrentMap();
-  }
-
-  @Override
-  public <K, V> DistributedMultiMap<K, V> getMultiMap(CacheConfig<K, V> 
config) {
-    DistributedMultiMap<K, V> mmap = (DistributedMultiMap<K, V>) 
multiMaps.get(config);
-    if (mmap == null) {
-      multiMaps.putIfAbsent(config, new LocalDistributedMultiMapImpl<K, 
V>(config));
-      return (DistributedMultiMap<K, V>) multiMaps.get(config);
-    } else {
-      return mmap;
-    }
-  }
-
-  @Override
-  public <K, V> DistributedMap<K, V> getMap(CacheConfig<K, V> config) {
-    DistributedMap<K, V> m = (DistributedMap<K, V>) maps.get(config);
-    if (m == null) {
-      maps.putIfAbsent(config, new LocalDistributedMapImpl<K, V>(config));
-      return (DistributedMap<K, V>) maps.get(config);
-    } else {
-      return m;
-    }
-  }
-
-  @Override
-  public Counter getCounter(String name) {
-    Counter c = counters.get(name);
-    if (c == null) {
-      counters.putIfAbsent(name, new LocalCounterImpl());
-      return counters.get(name);
-    } else {
-      return c;
-    }
-  }
-
-  private static BytesHolder serialize(Object obj, SerializationMode mode) {
-    if (obj instanceof String) {
-      return new BytesHolder( ((String)obj).getBytes(Charsets.UTF_8));
-    }
-    try{
-      switch (mode) {
-      case DRILL_SERIALIZIABLE: {
-        ByteArrayDataOutput out = ByteStreams.newDataOutput();
-        OutputStream outputStream = 
DataOutputOutputStream.constructOutputStream(out);
-        ((DrillSerializable)obj).writeToStream(outputStream);
-        outputStream.flush();
-        return new BytesHolder(out.toByteArray());
-      }
-
-      case JACKSON: {
-        ByteArrayDataOutput out = ByteStreams.newDataOutput();
-        out.write(mapper.writeValueAsBytes(obj));
-        return new BytesHolder(out.toByteArray());
-      }
-
-      case PROTOBUF:
-        return new BytesHolder(( (Message) obj).toByteArray());
-
-      }
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-
-    throw new UnsupportedOperationException();
-  }
-
-  private static <V> V deserialize(BytesHolder b, SerializationMode mode, 
Class<V> clazz) {
-    byte[] bytes = b.bytes;
-    try {
-
-      if (clazz == String.class) {
-        return (V) new String(bytes, Charsets.UTF_8);
-      }
-
-      switch (mode) {
-      case DRILL_SERIALIZIABLE: {
-        InputStream inputStream = new ByteArrayInputStream(bytes);
-        V obj = 
clazz.getConstructor(BufferAllocator.class).newInstance(allocator);
-        ((DrillSerializable) obj).readFromStream(inputStream);
-        return obj;
-      }
-
-      case JACKSON: {
-        return (V) mapper.readValue(bytes, clazz);
-      }
-
-      case PROTOBUF: {
-        Parser<V> parser = null;
-        for (Field f : clazz.getFields()) {
-          if (f.getName().equals("PARSER") && 
Modifier.isStatic(f.getModifiers())) {
-            parser = (Parser<V>) f.get(null);
-          }
-        }
-        if (parser == null) {
-          throw new UnsupportedOperationException(String.format("Unable to 
find parser for class %s.", clazz.getName()));
-        }
-        InputStream inputStream = new ByteArrayInputStream(bytes);
-        return parser.parseFrom(inputStream);
-      }
-
-      }
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-
-    throw new UnsupportedOperationException();
-  }
-
-  private static class BytesHolder {
-    final byte[] bytes;
-    public BytesHolder(byte[] bytes) {
-      this.bytes = bytes;
-    }
-
-    @Override
-    public int hashCode() {
-      final int prime = 31;
-      int result = 1;
-      result = prime * result + Arrays.hashCode(bytes);
-      return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (this == obj) {
-        return true;
-      }
-      if (obj == null) {
-        return false;
-      }
-      if (getClass() != obj.getClass()) {
-        return false;
-      }
-      BytesHolder other = (BytesHolder) obj;
-      if (!Arrays.equals(bytes, other.bytes)) {
-        return false;
-      }
-      return true;
-    }
-
-  }
-
-  static class LocalDistributedMultiMapImpl<K, V> implements 
DistributedMultiMap<K, V> {
-    private ListMultimap<BytesHolder, BytesHolder> mmap;
-    private CacheConfig<K, V> config;
-
-    public LocalDistributedMultiMapImpl(CacheConfig<K, V> config) {
-      ListMultimap<BytesHolder, BytesHolder> innerMap = 
ArrayListMultimap.create();
-      mmap = Multimaps.synchronizedListMultimap(innerMap);
-      this.config = config;
-    }
-
-    @Override
-    public Collection<V> get(K key) {
-      List<V> list = Lists.newArrayList();
-      for (BytesHolder o : mmap.get(serialize(key, config.getMode()))) {
-        list.add(deserialize(o, config.getMode(), config.getValueClass()));
-      }
-      return list;
-    }
-
-    @Override
-    public Future<Boolean> put(K key, V value) {
-      mmap.put(serialize(key, config.getMode()), serialize(value, 
config.getMode()));
-      return new LocalCacheFuture(true);
-    }
-  }
-
-  public static class LocalCacheFuture<V> implements Future<V> {
-
-    V value;
-
-    public LocalCacheFuture(V value) {
-      this.value = value;
-    }
-
-    @Override
-    public boolean cancel(boolean mayInterruptIfRunning) {
-      return false;
-    }
-
-    @Override
-    public boolean isCancelled() {
-      return false;
-    }
-
-    @Override
-    public boolean isDone() {
-      return true;
-    }
-
-    @Override
-    public V get() throws InterruptedException, ExecutionException {
-      return value;
-    }
-
-    @Override
-    public V get(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException, TimeoutException {
-      return value;
-    }
-  }
-
-  public static class LocalDistributedMapImpl<K, V> implements 
DistributedMap<K, V> {
-    protected ConcurrentMap<BytesHolder, BytesHolder> m;
-    protected CacheConfig<K, V> config;
-
-    public LocalDistributedMapImpl(CacheConfig<K, V> config) {
-      m = Maps.newConcurrentMap();
-      this.config = config;
-    }
-
-    @Override
-    public V get(K key) {
-      BytesHolder b = m.get(serialize(key, config.getMode()));
-      if (b == null) {
-        return null;
-      }
-      return (V) deserialize(b, config.getMode(), config.getValueClass());
-    }
-
-    @Override
-    public Iterable<Entry<K, V>> getLocalEntries() {
-      return new Iterable<Entry<K, V>>() {
-        @Override
-        public Iterator<Entry<K, V>> iterator() {
-          return new DeserializingTransformer(m.entrySet().iterator());
-        }
-      };
-
-    }
-
-    @Override
-    public Future<V> put(K key, V value) {
-      m.put(serialize(key, config.getMode()), serialize(value, 
config.getMode()));
-      return new LocalCacheFuture(value);
-    }
-
-
-    @Override
-    public Future<V> putIfAbsent(K key, V value) {
-      m.putIfAbsent(serialize(key, config.getMode()), serialize(value, 
config.getMode()));
-      return new LocalCacheFuture(value);
-    }
-
-    @Override
-    public Future<V> delete(K key) {
-      V value = get(key);
-      m.remove(serialize(key, config.getMode()));
-      return new LocalCacheFuture(value);
-    }
-
-    @Override
-    public Future<V> putIfAbsent(K key, V value, long ttl, TimeUnit timeUnit) {
-      m.putIfAbsent(serialize(key, config.getMode()), serialize(value, 
config.getMode()));
-      logger.warn("Expiration not implemented in local map cache");
-      return new LocalCacheFuture<V>(value);
-    }
-
-    private class DeserializingTransformer implements Iterator<Map.Entry<K, 
V>> {
-      private Iterator<Map.Entry<BytesHolder, BytesHolder>> inner;
-
-      public DeserializingTransformer(Iterator<Entry<BytesHolder, 
BytesHolder>> inner) {
-        super();
-        this.inner = inner;
-      }
-
-      @Override
-      public boolean hasNext() {
-        return inner.hasNext();
-      }
-
-      @Override
-      public Entry<K, V> next() {
-        return newEntry(inner.next());
-      }
-
-      @Override
-      public void remove() {
-        throw new UnsupportedOperationException();
-      }
-
-      public Entry<K, V> newEntry(final Entry<BytesHolder, BytesHolder> input) 
{
-        return new Map.Entry<K, V>() {
-
-          @Override
-          public K getKey() {
-            return deserialize(input.getKey(), config.getMode(), 
config.getKeyClass());
-          }
-
-          @Override
-          public V getValue() {
-            return deserialize(input.getValue(), config.getMode(), 
config.getValueClass());
-          }
-
-          @Override
-          public V setValue(V value) {
-            throw new UnsupportedOperationException();
-          }
-
-        };
-      }
-
-    }
-
-  }
-
-  public static class LocalCounterImpl implements Counter {
-    private AtomicLong al = new AtomicLong();
-
-    @Override
-    public long get() {
-      return al.get();
-    }
-
-    @Override
-    public long incrementAndGet() {
-      return al.incrementAndGet();
-    }
-
-    @Override
-    public long decrementAndGet() {
-      return al.decrementAndGet();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
index 2d69ca3..52d9e34 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
@@ -24,7 +24,6 @@ import java.util.Set;
 
 import org.apache.drill.common.util.DrillStringUtils;
 import org.apache.drill.common.util.FileUtils;
-import org.apache.drill.exec.cache.DistributedCache;
 import org.apache.drill.exec.compile.MergeAdapter.MergedClassResult;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.codehaus.commons.compiler.CompileException;
@@ -40,10 +39,8 @@ public class ClassTransformer {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ClassTransformer.class);
 
   private final ByteCodeLoader byteCodeLoader = new ByteCodeLoader();
-  private final DistributedCache cache;
 
-  public ClassTransformer(DistributedCache cache) {
-    this.cache = cache;
+  public ClassTransformer() {
   }
 
   public static class ClassSet{

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java
index a9b0c61..5628ea3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java
@@ -21,8 +21,6 @@ import java.io.IOException;
 import java.util.concurrent.ExecutionException;
 
 import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.cache.DistributedCache;
-import org.apache.drill.exec.cache.local.LocalCache;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.server.options.OptionManager;
@@ -37,14 +35,12 @@ public class CodeCompiler {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(CodeCompiler.class);
 
   private final ClassTransformer transformer;
-  private final DistributedCache distributedCache;
   private final LoadingCache<CodeGenerator<?>, GeneratedClassEntry>  cache;
   private final DrillConfig config;
   private final OptionManager systemOptionManager;
 
-  public CodeCompiler(DrillConfig config, DistributedCache distributedCache, 
OptionManager systemOptionManager){
-    this.transformer = new ClassTransformer(distributedCache);
-    this.distributedCache = distributedCache;
+  public CodeCompiler(DrillConfig config, OptionManager systemOptionManager){
+    this.transformer = new ClassTransformer();
     this.cache = CacheBuilder //
         .newBuilder() //
         .maximumSize(1000) //
@@ -87,6 +83,6 @@ public class CodeCompiler {
   }
 
   public static CodeCompiler getTestCompiler(DrillConfig c) throws IOException{
-    return new CodeCompiler(c, new LocalCache(), new SystemOptionManager(c, 
new LocalPStoreProvider(c)).init());
+    return new CodeCompiler(c, new SystemOptionManager(c, new 
LocalPStoreProvider(c)).init());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 0564c1a..9b78c1d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -68,7 +68,6 @@ public class FragmentContext implements Closeable {
   private final FragmentStats stats;
   private final FunctionImplementationRegistry funcRegistry;
   private final QueryClassLoader loader;
-  private final ClassTransformer transformer;
   private final BufferAllocator allocator;
   private final PlanFragment fragment;
   private List<Thread> daemonThreads = Lists.newLinkedList();
@@ -85,8 +84,7 @@ public class FragmentContext implements Closeable {
 
   public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, 
UserClientConnection connection,
       FunctionImplementationRegistry funcRegistry) throws 
OutOfMemoryException, ExecutionSetupException {
-    this.transformer = new ClassTransformer(dbContext.getCache());
-    this.stats = new FragmentStats(dbContext.getMetrics());
+    this.stats = new FragmentStats(dbContext.getMetrics(), 
fragment.getAssignment());
     this.context = dbContext;
     this.connection = connection;
     this.fragment = fragment;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
index 22872f9..4431235 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.ops;
 import java.util.List;
 
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
 
 import com.codahale.metrics.MetricRegistry;
@@ -28,19 +29,19 @@ import com.google.common.collect.Lists;
 public class FragmentStats {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(FragmentStats.class);
 
-
   private List<OperatorStats> operators = Lists.newArrayList();
   private final long startTime;
+  private final DrillbitEndpoint endpoint;
 
-  public FragmentStats(MetricRegistry metrics) {
+  public FragmentStats(MetricRegistry metrics, DrillbitEndpoint endpoint) {
     this.startTime = System.currentTimeMillis();
+    this.endpoint = endpoint;
   }
 
   public void addMetricsToStatus(MinorFragmentProfile.Builder prfB) {
-
     prfB.setStartTime(startTime);
     prfB.setEndTime(System.currentTimeMillis());
-
+    prfB.setEndpoint(endpoint);
     for(OperatorStats o : operators){
       prfB.addOperatorProfile(o.getProfile());
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 1ad144d..4b290a5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -23,7 +23,6 @@ import net.hydromatic.optiq.SchemaPlus;
 import net.hydromatic.optiq.jdbc.SimpleOptiqSchema;
 
 import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.cache.DistributedCache;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
@@ -112,11 +111,6 @@ public class QueryContext{
     return drillbitContext.getStorage();
   }
 
-
-  public DistributedCache getCache(){
-    return drillbitContext.getCache();
-  }
-
   public Collection<DrillbitEndpoint> getActiveEndpoints(){
     return drillbitContext.getBits();
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index bd15ac9..868eb6e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -104,7 +104,7 @@ public class ScreenCreator implements RootCreator<Screen>{
               .setQueryId(context.getHandle().getQueryId()) //
               .setRowCount(0) //
               .setQueryState(QueryState.FAILED)
-              .addError(ErrorHelper.logAndConvertError(context.getIdentity(), 
"Screen received stop request sent.",
+              
.addError(ErrorHelper.logAndConvertMessageError(context.getIdentity(), "Query 
stopeed.",
                 context.getFailureCause(), logger, verbose))
               .setDef(RecordBatchDef.getDefaultInstance()) //
               .setIsLastChunk(true) //
@@ -203,7 +203,7 @@ public class ScreenCreator implements RootCreator<Screen>{
         sendCount.decrement();
         logger.error("Failure while sending data to user.", ex);
         boolean verbose = 
context.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val;
-        ErrorHelper.logAndConvertError(context.getIdentity(), "Failure while 
sending fragment to client.", ex, logger,
+        ErrorHelper.logAndConvertMessageError(context.getIdentity(), "Failure 
while sending fragment to client.", ex, logger,
           verbose);
         ok = false;
         this.ex = ex;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
index c594e70..3c8e551 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
@@ -185,9 +185,7 @@ public class BroadcastSenderRootExec extends BaseRootExec {
     public void failed(RpcException ex) {
       sendCount.decrement();
       logger.error("Failure while sending data to user.", ex);
-      boolean verbose = 
context.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val;
-      ErrorHelper.logAndConvertError(context.getIdentity(), "Failure while 
sending fragment to client.", ex, logger,
-        verbose);
+      ErrorHelper.logAndConvertError(context.getIdentity(), "Failure while 
sending fragment to client.", ex, logger);
       ok = false;
       this.ex = ex;
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index aecf363..a062074 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -141,7 +141,7 @@ public class OrderedPartitionRecordBatch extends 
AbstractRecordBatch<OrderedPart
     this.samplingFactor = pop.getSamplingFactor();
     this.completionFactor = pop.getCompletionFactor();
 
-    DistributedCache cache = context.getDrillbitContext().getCache();
+    DistributedCache cache = null;
     this.mmap = cache.getMultiMap(MULTI_CACHE_CONFIG);
     this.tableMap = cache.getMap(SINGLE_CACHE_CONFIG);
     Preconditions.checkNotNull(tableMap);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/StatusHandler.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/StatusHandler.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/StatusHandler.java
index 469140c..5e21878 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/StatusHandler.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/StatusHandler.java
@@ -20,7 +20,6 @@ package org.apache.drill.exec.physical.impl.partitionsender;
 
 import io.netty.buffer.ByteBuf;
 
-import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.impl.SendingAccountor;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
@@ -50,9 +49,7 @@ public class StatusHandler extends 
BaseRpcOutcomeListener<Ack> {
   public void failed(RpcException ex) {
     sendCount.decrement();
     logger.error("Failure while sending data to user.", ex);
-    boolean verbose = 
context.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val;
-    ErrorHelper.logAndConvertError(context.getIdentity(), "Failure while 
sending fragment to client.", ex, logger,
-      verbose);
+    ErrorHelper.logAndConvertError(context.getIdentity(), "Failure while 
sending fragment to client.", ex, logger);
     ok = false;
     this.ex = ex;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
index 10ae6e3..9b071ad 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
@@ -19,6 +19,8 @@ package org.apache.drill.exec.rpc;
 
 import io.netty.buffer.ByteBuf;
 
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
+
 public class BaseRpcOutcomeListener<T> implements RpcOutcomeListener<T> {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BaseRpcOutcomeListener.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
index 8f43b06..0016d6a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
@@ -23,7 +23,7 @@ import io.netty.channel.ChannelFuture;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.drill.exec.proto.GeneralRPCProtos.RpcFailure;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
 
 /**
  * Manages the creation of rpc futures for a particular socket.
@@ -146,7 +146,7 @@ public class CoordinationQueue {
     return crpc;
   }
 
-  public void updateFailedFuture(int coordinationId, RpcFailure failure) {
+  public void updateFailedFuture(int coordinationId, DrillPBError failure) {
     // logger.debug("Updating failed future.");
     try {
       RpcOutcome<?> rpc = removeFromMap(coordinationId);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java
index edad63e..5eda350 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java
@@ -34,8 +34,15 @@ public class OutboundRpcMessage extends RpcMessage {
   final MessageLite pBody;
   public ByteBuf[] dBodies;
 
+
+
   public OutboundRpcMessage(RpcMode mode, EnumLite rpcType, int 
coordinationId, MessageLite pBody, ByteBuf... dBodies) {
-    super(mode, rpcType.getNumber(), coordinationId);
+      this(mode, rpcType.getNumber(), coordinationId, pBody, dBodies);
+  }
+
+
+  OutboundRpcMessage(RpcMode mode, int rpcTypeNumber, int coordinationId, 
MessageLite pBody, ByteBuf... dBodies) {
+    super(mode, rpcTypeNumber, coordinationId);
     this.pBody = pBody;
 
     // Netty doesn't traditionally release the reference on an unreadable 
buffer.  However, we need to so that if we send a empty or unwritable buffer, 
we still release.  otherwise we get weird memory leaks when sending empty 
vectors.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteRpcException.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteRpcException.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteRpcException.java
index d75e902..14ea873 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteRpcException.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteRpcException.java
@@ -17,22 +17,27 @@
  */
 package org.apache.drill.exec.rpc;
 
-import org.apache.drill.exec.proto.GeneralRPCProtos.RpcFailure;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
+import org.apache.drill.exec.work.ErrorHelper;
 
 public class RemoteRpcException extends RpcException{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(RemoteRpcException.class);
 
-  private final RpcFailure failure;
+  private final DrillPBError failure;
 
-  public RemoteRpcException(RpcFailure failure) {
-    super(String.format("Failure while executing rpc.  Remote failure message: 
[%s].  Error Code: [%d].  Remote Error Id: [%d]", failure.getShortError(), 
failure.getErrorId(), failure.getErrorCode()));
+  public RemoteRpcException(DrillPBError failure) {
+    super(ErrorHelper.getErrorMessage(failure, false));
     this.failure = failure;
   }
 
-  public RpcFailure getFailure() {
+  @Override
+  public DrillPBError getRemoteError() {
     return failure;
   }
 
-
+  @Override
+  public boolean isRemote() {
+    return true;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index 918ca0b..96c9911 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -30,8 +30,10 @@ import java.io.Closeable;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.drill.exec.proto.GeneralRPCProtos.RpcFailure;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
+import org.apache.drill.exec.work.ErrorHelper;
 
 import com.google.common.base.Preconditions;
 import com.google.protobuf.Internal.EnumLite;
@@ -188,7 +190,16 @@ public abstract class RpcBus<T extends EnumLite, C extends 
RemoteConnection> imp
       case REQUEST: {
         // handle message and ack.
         ResponseSender sender = new ResponseSenderImpl(connection, 
msg.coordinationId);
-        handle(connection, msg.rpcType, msg.pBody, msg.dBody, sender);
+        try {
+          handle(connection, msg.rpcType, msg.pBody, msg.dBody, sender);
+        } catch(UserRpcException e){
+          DrillPBError error = ErrorHelper.logAndConvertError(e.getEndpoint(), 
e.getUserMessage(), e, logger);
+          OutboundRpcMessage outMessage = new 
OutboundRpcMessage(RpcMode.RESPONSE_FAILURE, 0, msg.coordinationId, error);
+          if (RpcConstants.EXTRA_DEBUGGING) {
+            logger.debug("Adding message to outbound buffer. {}", outMessage);
+          }
+          connection.getChannel().writeAndFlush(outMessage);
+        }
         msg.release();  // we release our ownership.  Handle could have taken 
over ownership.
         break;
       }
@@ -212,7 +223,7 @@ public abstract class RpcBus<T extends EnumLite, C extends 
RemoteConnection> imp
         break;
 
       case RESPONSE_FAILURE:
-        RpcFailure failure = RpcFailure.parseFrom(new 
ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
+        DrillPBError failure = DrillPBError.parseFrom(new 
ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
         queue.updateFailedFuture(msg.coordinationId, failure);
         msg.release();
         if (RpcConstants.EXTRA_DEBUGGING) {
@@ -227,40 +238,6 @@ public abstract class RpcBus<T extends EnumLite, C extends 
RemoteConnection> imp
 
   }
 
-//  private class Listener implements GenericFutureListener<ChannelFuture> {
-//
-//    private int coordinationId;
-//    private Class<?> clazz;
-//
-//    public Listener(int coordinationId, Class<?> clazz) {
-//      this.coordinationId = coordinationId;
-//      this.clazz = clazz;
-//    }
-//
-//    @Override
-//    public void operationComplete(ChannelFuture channelFuture) throws 
Exception {
-//      // logger.debug("Completed channel write.");
-//
-//      if (channelFuture.isCancelled()) {
-//        RpcOutcome<?> rpcFuture = queue.getFuture(-1, coordinationId, clazz);
-//        rpcFuture.setException(new CancellationException("Socket operation 
was canceled."));
-//      } else if (!channelFuture.isSuccess()) {
-//        try {
-//          channelFuture.get();
-//          throw new IllegalStateException("Future was described as completed 
and not succesful but did not throw an exception.");
-//        } catch (Exception e) {
-//          logger.error("Error occurred during Rpc", e);
-//          RpcOutcome<?> rpcFuture = queue.getFuture(-1, coordinationId, 
clazz);
-//          rpcFuture.setException(e);
-//        }
-//      } else {
-//        // send was successful. No need to modify DrillRpcFuture.
-//        return;
-//      }
-//    }
-//
-//  }
-
   public static <T> T get(ByteBuf pBody, Parser<T> parser) throws RpcException{
     try {
       ByteBufInputStream is = new ByteBufInputStream(pBody);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
index 34256f3..f9da6f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
@@ -58,6 +58,7 @@ class RpcEncoder extends 
MessageToMessageEncoder<OutboundRpcMessage>{
     if (!ctx.channel().isOpen()) {
       //output.add(ctx.alloc().buffer(0));
       logger.debug("Channel closed, skipping encode.");
+      msg.release();
       return;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
index eb870b3..a6d0e8e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ExecutionException;
 
 import org.apache.drill.common.exceptions.DrillIOException;
 import org.apache.drill.common.util.DrillStringUtils;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
 
 /**
  * Parent class for all rpc exceptions.
@@ -66,4 +67,11 @@ public class RpcException extends DrillIOException{
     return new RpcException(message, t);
   }
 
+  public boolean isRemote(){
+    return false;
+  }
+
+  public DrillPBError getRemoteError(){
+    throw new UnsupportedOperationException();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserRpcException.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserRpcException.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserRpcException.java
new file mode 100644
index 0000000..1d2cc1a
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserRpcException.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+public class UserRpcException extends RpcException {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UserRpcException.class);
+
+  private final String message;
+  private final DrillbitEndpoint endpoint;
+  private final Throwable t;
+
+  public UserRpcException(DrillbitEndpoint endpoint, String message, Throwable 
t) {
+    super(t);
+    this.message = message;
+    this.endpoint = endpoint;
+    this.t = t;
+  }
+
+  public String getUserMessage() {
+    return message;
+  }
+
+  public DrillbitEndpoint getEndpoint() {
+    return endpoint;
+  }
+
+  public Throwable getUserException() {
+    return t;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
index 31fbe7b..1308c37 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
@@ -21,7 +21,7 @@ package org.apache.drill.exec.rpc.control;
 import org.apache.drill.exec.proto.BitControl.BitControlHandshake;
 import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
-import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.BitControl.InitializeFragments;
 import org.apache.drill.exec.proto.BitControl.RpcType;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
@@ -36,14 +36,14 @@ public class ControlRpcConfig {
 
   public static RpcConfig MAPPING = 
RpcConfig.newBuilder("BIT-CONTROL-RPC-MAPPING") //
       .add(RpcType.HANDSHAKE, BitControlHandshake.class, RpcType.HANDSHAKE, 
BitControlHandshake.class)
-      .add(RpcType.REQ_INIATILIZE_FRAGMENT, PlanFragment.class, RpcType.ACK, 
Ack.class)
+      .add(RpcType.REQ_INIATILIZE_FRAGMENTS, InitializeFragments.class, 
RpcType.ACK, Ack.class)
       .add(RpcType.REQ_CANCEL_FRAGMENT, FragmentHandle.class, RpcType.ACK, 
Ack.class)
       .add(RpcType.REQ_RECEIVER_FINISHED, FinishedReceiver.class, RpcType.ACK, 
Ack.class)
       .add(RpcType.REQ_FRAGMENT_STATUS, FragmentStatus.class, RpcType.ACK, 
Ack.class)
       .add(RpcType.REQ_QUERY_STATUS, QueryId.class, RpcType.RESP_QUERY_STATUS, 
QueryProfile.class)
       .build();
 
-  public static int RPC_VERSION = 2;
+  public static int RPC_VERSION = 3;
 
   public static final Response OK = new Response(RpcType.ACK, Acks.OK);
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
index d035c10..461cd8a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
@@ -17,8 +17,11 @@
  */
 package org.apache.drill.exec.rpc.control;
 
+import java.util.Collection;
+
 import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
+import org.apache.drill.exec.proto.BitControl.InitializeFragments;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.BitControl.RpcType;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -47,8 +50,8 @@ public class ControlTunnel {
     return manager.getEndpoint();
   }
 
-  public void sendFragment(RpcOutcomeListener<Ack> outcomeListener, 
PlanFragment fragment){
-    SendFragment b = new SendFragment(outcomeListener, fragment);
+  public void sendFragments(RpcOutcomeListener<Ack> outcomeListener, 
InitializeFragments fragments){
+    SendFragment b = new SendFragment(outcomeListener, fragments);
     manager.runCommand(b);
   }
 
@@ -121,16 +124,16 @@ public class ControlTunnel {
   }
 
   public static class SendFragment extends ListeningCommand<Ack, 
ControlConnection> {
-    final PlanFragment fragment;
+    final InitializeFragments fragments;
 
-    public SendFragment(RpcOutcomeListener<Ack> listener, PlanFragment 
fragment) {
+    public SendFragment(RpcOutcomeListener<Ack> listener, InitializeFragments 
fragments) {
       super(listener);
-      this.fragment = fragment;
+      this.fragments = fragments;
     }
 
     @Override
     public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, 
ControlConnection connection) {
-      connection.send(outcomeListener, RpcType.REQ_INIATILIZE_FRAGMENT, 
fragment, Ack.class);
+      connection.send(outcomeListener, RpcType.REQ_INIATILIZE_FRAGMENTS, 
fragments, Ack.class);
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
index 45acd13..23380ff 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
 import org.apache.drill.exec.work.foreman.Foreman;
@@ -82,78 +83,73 @@ public class WorkEventBus {
     }
   }
 
-  public void setRootFragmentManager(RootFragmentManager fragmentManager) {
-    FragmentManager old = managers.putIfAbsent(fragmentManager.getHandle(), 
fragmentManager);
-    if (old != null) {
-      throw new IllegalStateException(
-          "Tried to set fragment manager when has already been set for the 
provided fragment handle.");
+  public void setFragmentManager(FragmentManager fragmentManager) {
+    logger.debug("Manager created: {}", 
QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle()));
+
+    synchronized (managers) {
+      FragmentManager old = managers.putIfAbsent(fragmentManager.getHandle(), 
fragmentManager);
+      managers.notifyAll();
+      if (old != null) {
+        throw new IllegalStateException(
+            "Tried to set fragment manager when has already been set for the 
provided fragment handle.");
+      }
     }
   }
 
-  public FragmentManager getFragmentManager(FragmentHandle handle) {
+  public FragmentManager getFragmentManagerIfExists(FragmentHandle handle){
     return managers.get(handle);
-  }
 
-  public void cancelFragment(FragmentHandle handle) {
-    cancelledFragments.put(handle, null);
-    removeFragmentManager(handle);
   }
 
-  public FragmentManager getOrCreateFragmentManager(FragmentHandle handle) 
throws FragmentSetupException{
+  public FragmentManager getFragmentManager(FragmentHandle handle) throws 
FragmentSetupException {
+
+    // check if this was a recently canceled fragment.  If so, throw away 
message.
     if (cancelledFragments.asMap().containsKey(handle)) {
       logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", 
handle);
       return null;
     }
-    // We need to synchronize this part. Without that, multiple bit servers 
will be creating a Fragment manager and the
-    // corresponding FragmentContext object. Each FragmentContext object 
registers with the TopLevelAllocator so that
-    // the allocator can manage fragment resources across all fragments. So we 
need to make sure only one
-    // FragmentManager is actually created and used for a given FragmentHandle.
-    FragmentManager newManager;
-    FragmentManager manager;
-
-    manager = managers.get(handle);
-    if (manager != null) {
-      return manager;
+
+    // chm manages concurrency better then everyone fighting for the same lock 
so we'll do a double check.
+    FragmentManager m = managers.get(handle);
+    if(m != null){
+      return m;
     }
-    if (logger.isDebugEnabled()) {
-      String fragHandles = "Looking for Fragment handle: " + handle.toString() 
+ "(Hash Code:" + handle.hashCode()
-        + ")\n Fragment Handles in Fragment manager: ";
-      for (FragmentHandle h : managers.keySet()) {
-        fragHandles += h.toString() + "\n";
-        fragHandles += "[Hash Code: " + h.hashCode() + "]\n";
+
+    logger.debug("Fragment was requested but no manager exists.  Waiting for 
manager for fragment: {}", QueryIdHelper.getQueryIdentifier(handle));
+    try{
+    // We need to handle the race condition between the fragments being sent 
to leaf nodes and intermediate nodes.  It is possible that a leaf node would 
send a data batch to a intermediate node before the intermediate node received 
the associated plan.  As such, we will wait here for a bit to see if the 
appropriate fragment shows up.
+    long expire = System.currentTimeMillis() + 30*1000;
+    synchronized(managers){
+
+      // we loop because we may be woken up by some other, unrelated manager 
insertion.
+      while(true){
+        m = managers.get(handle);
+        if(m != null) {
+          return m;
+        }
+        long timeToWait = expire - System.currentTimeMillis();
+        if(timeToWait <= 0){
+          break;
+        }
+
+        managers.wait(timeToWait);
       }
-      logger.debug(fragHandles);
-    }
-    DistributedMap<FragmentHandle, PlanFragment> planCache = 
bee.getContext().getCache().getMap(Foreman.FRAGMENT_CACHE);
-//      for (Map.Entry<FragmentHandle, PlanFragment> e : 
planCache.getLocalEntries()) {
-//      logger.debug("Key: {}", e.getKey());
-//      logger.debug("Value: {}", e.getValue());
-//      }
-    PlanFragment fragment = 
bee.getContext().getCache().getMap(Foreman.FRAGMENT_CACHE).get(handle);
-
-    if (fragment == null) {
-      throw new FragmentSetupException("Received batch where fragment was not 
in cache.");
+
+      throw new FragmentSetupException("Failed to receive plan fragment that 
was required for id: " + QueryIdHelper.getQueryIdentifier(handle));
     }
-    logger.debug("Allocating new non root fragment manager: " + 
handle.toString());
-    newManager = new NonRootFragmentManager(fragment, bee);
-    logger.debug("Allocated new non root fragment manager: " + 
handle.toString());
-
-    manager = managers.putIfAbsent(fragment.getHandle(), newManager);
-    if (manager == null) {
-      // we added a handler, inform the bee that we did so. This way, the 
foreman can track status.
-      bee.addFragmentPendingRemote(newManager);
-      manager = newManager;
-    }else{
-      // prevent a leak of the initial allocation.
-      // Also the fragment context is registered with the top level allocator.
-      // This will unregister the unused fragment context as well.
-      newManager.getFragmentContext().close();
+    }catch(InterruptedException e){
+      throw new FragmentSetupException("Interrupted while waiting to receive 
plan fragment..");
     }
+  }
 
-    return manager;
+  public void cancelFragment(FragmentHandle handle) {
+    logger.debug("Fragment canceled: {}", 
QueryIdHelper.getQueryIdentifier(handle));
+    cancelledFragments.put(handle, null);
+    removeFragmentManager(handle);
   }
 
   public void removeFragmentManager(FragmentHandle handle) {
+    logger.debug("Removing fragment manager: {}", 
QueryIdHelper.getQueryIdentifier(handle));
     managers.remove(handle);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
index 2c6e02c..1f261bc 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
@@ -108,7 +108,7 @@ public class DataServer extends BasicServer<RpcType, 
BitServerConnection> {
     FragmentHandle handle = fragmentBatch.getHandle();
 
     try {
-      FragmentManager manager = 
workBus.getOrCreateFragmentManager(fragmentBatch.getHandle());
+      FragmentManager manager = 
workBus.getFragmentManager(fragmentBatch.getHandle());
       if (manager == null) {
         if (body != null) {
           body.release();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 2125166..e8f175b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -21,8 +21,6 @@ import java.io.Closeable;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.cache.DistributedCache;
-import org.apache.drill.exec.cache.infinispan.ICache;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.coord.ClusterCoordinator.RegistrationHandle;
 import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
@@ -84,7 +82,6 @@ public class Drillbit implements Closeable{
 
   final ClusterCoordinator coord;
   final ServiceEngine engine;
-  final DistributedCache cache;
   final PStoreProvider storeProvider;
   final WorkManager manager;
   final BootStrapContext context;
@@ -107,13 +104,11 @@ public class Drillbit implements Closeable{
 
     if(serviceSet != null) {
       this.coord = serviceSet.getCoordinator();
-      this.cache = serviceSet.getCache();
       this.storeProvider = new LocalPStoreProvider(config);
     } else {
       Runtime.getRuntime().addShutdownHook(new ShutdownThread(config));
       this.coord = new ZKClusterCoordinator(config);
       this.storeProvider = new PStoreRegistry(this.coord, 
config).newPStoreProvider();
-      this.cache = new ICache(config, context.getAllocator(), false);
     }
   }
 
@@ -148,8 +143,7 @@ public class Drillbit implements Closeable{
     coord.start(10000);
     storeProvider.start();
     DrillbitEndpoint md = engine.start();
-    manager.start(md, cache, engine.getController(), 
engine.getDataConnectionCreator(), coord, storeProvider);
-    cache.run();
+    manager.start(md, engine.getController(), 
engine.getDataConnectionCreator(), coord, storeProvider);
     manager.getContext().getStorage().init();
     manager.getContext().getOptionManager().init();
     handle = coord.register(md);
@@ -173,11 +167,6 @@ public class Drillbit implements Closeable{
     } catch (Exception e) {
       logger.warn("Failure while shutting down embedded jetty server.");
     }
-    try {
-      cache.close();
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
     Closeables.closeQuietly(engine);
     Closeables.closeQuietly(storeProvider);
     Closeables.closeQuietly(coord);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index 7d48711..165d5f3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -22,7 +22,6 @@ import io.netty.channel.EventLoopGroup;
 import java.util.Collection;
 
 import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.cache.DistributedCache;
 import org.apache.drill.exec.compile.CodeCompiler;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
@@ -49,7 +48,6 @@ public class DrillbitContext {
   private PhysicalPlanReader reader;
   private final ClusterCoordinator coord;
   private final DataConnectionCreator connectionsPool;
-  private final DistributedCache cache;
   private final DrillbitEndpoint endpoint;
   private final StoragePluginRegistry storagePlugins;
   private final OperatorCreatorRegistry operatorCreatorRegistry;
@@ -60,7 +58,7 @@ public class DrillbitContext {
   private final PStoreProvider provider;
   private final CodeCompiler compiler;
 
-  public DrillbitContext(DrillbitEndpoint endpoint, BootStrapContext context, 
ClusterCoordinator coord, Controller controller, DataConnectionCreator 
connectionsPool, DistributedCache cache, WorkEventBus workBus, PStoreProvider 
provider) {
+  public DrillbitContext(DrillbitEndpoint endpoint, BootStrapContext context, 
ClusterCoordinator coord, Controller controller, DataConnectionCreator 
connectionsPool, WorkEventBus workBus, PStoreProvider provider) {
     super();
     Preconditions.checkNotNull(endpoint);
     Preconditions.checkNotNull(context);
@@ -71,7 +69,6 @@ public class DrillbitContext {
     this.context = context;
     this.coord = coord;
     this.connectionsPool = connectionsPool;
-    this.cache = cache;
     this.endpoint = endpoint;
     this.provider = provider;
     this.storagePlugins = new StoragePluginRegistry(this);
@@ -79,7 +76,7 @@ public class DrillbitContext {
     this.operatorCreatorRegistry = new 
OperatorCreatorRegistry(context.getConfig());
     this.functionRegistry = new 
FunctionImplementationRegistry(context.getConfig());
     this.systemOptions = new SystemOptionManager(context.getConfig(), 
provider);
-    this.compiler = new CodeCompiler(context.getConfig(), cache, 
systemOptions);
+    this.compiler = new CodeCompiler(context.getConfig(), systemOptions);
   }
 
   public FunctionImplementationRegistry getFunctionImplementationRegistry() {
@@ -134,10 +131,6 @@ public class DrillbitContext {
     return context.getMetrics();
   }
 
-  public DistributedCache getCache(){
-    return cache;
-  }
-
   public PhysicalPlanReader getPlanReader(){
     return reader;
   }

Reply via email to