DRILL-1436: Remove use of UDP based cache for purposes of intermediate PlanFragment distribution
Includes: - Remove dependency on Infinispan - Update initialize fragments to send in batches. - Update RPC layer to capture UserRpcExceptions and propagate back. - Send full stack trace in DrillPBError and let foreman node decide on formatting. - Increment control rpc version - Update systables to report current drillbit and version Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/451dd608 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/451dd608 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/451dd608 Branch: refs/heads/master Commit: 451dd608a62c08dde26a2da2f085ab11a218ee72 Parents: 6dca24a Author: Jacques Nadeau <jacq...@apache.org> Authored: Sun Oct 19 10:46:25 2014 -0700 Committer: Jacques Nadeau <jacq...@apache.org> Committed: Sun Oct 26 21:54:38 2014 -0700 ---------------------------------------------------------------------- exec/java-exec/pom.xml | 15 - .../drill/exec/cache/infinispan/ICache.java | 339 -- .../infinispan/JacksonAdvancedExternalizer.java | 71 - .../ProtobufAdvancedExternalizer.java | 66 - .../infinispan/VAAdvancedExternalizer.java | 71 - .../cache/infinispan/ZookeeperCacheStore.java | 66 - .../drill/exec/cache/local/LocalCache.java | 407 --- .../drill/exec/compile/ClassTransformer.java | 5 +- .../apache/drill/exec/compile/CodeCompiler.java | 10 +- .../apache/drill/exec/ops/FragmentContext.java | 4 +- .../apache/drill/exec/ops/FragmentStats.java | 9 +- .../org/apache/drill/exec/ops/QueryContext.java | 6 - .../drill/exec/physical/impl/ScreenCreator.java | 4 +- .../BroadcastSenderRootExec.java | 4 +- .../OrderedPartitionRecordBatch.java | 2 +- .../impl/partitionsender/StatusHandler.java | 5 +- .../drill/exec/rpc/BaseRpcOutcomeListener.java | 2 + .../drill/exec/rpc/CoordinationQueue.java | 4 +- .../drill/exec/rpc/OutboundRpcMessage.java | 9 +- .../drill/exec/rpc/RemoteRpcException.java | 17 +- .../java/org/apache/drill/exec/rpc/RpcBus.java | 51 +- .../org/apache/drill/exec/rpc/RpcEncoder.java | 1 + .../org/apache/drill/exec/rpc/RpcException.java | 8 + .../apache/drill/exec/rpc/UserRpcException.java | 49 + .../exec/rpc/control/ControlRpcConfig.java | 6 +- .../drill/exec/rpc/control/ControlTunnel.java | 15 +- .../drill/exec/rpc/control/WorkEventBus.java | 102 +- .../apache/drill/exec/rpc/data/DataServer.java | 2 +- .../org/apache/drill/exec/server/Drillbit.java | 13 +- .../drill/exec/server/DrillbitContext.java | 11 +- .../drill/exec/server/RemoteServiceSet.java | 23 +- .../server/options/SystemOptionManager.java | 7 - .../drill/exec/store/sys/DrillbitIterator.java | 4 + .../drill/exec/store/sys/SystemTable.java | 3 +- .../drill/exec/store/sys/SystemTablePlugin.java | 2 + .../drill/exec/store/sys/VersionIterator.java | 78 + .../org/apache/drill/exec/work/ErrorHelper.java | 215 +- .../org/apache/drill/exec/work/WorkManager.java | 5 +- .../exec/work/batch/ControlHandlerImpl.java | 50 +- .../exec/work/batch/ControlMessageHandler.java | 5 +- .../apache/drill/exec/work/foreman/Foreman.java | 50 +- .../drill/exec/work/foreman/FragmentData.java | 6 +- .../drill/exec/work/foreman/QueryManager.java | 49 +- .../work/fragment/AbstractStatusReporter.java | 3 +- .../java/org/apache/drill/PlanningBase.java | 8 - .../java/org/apache/drill/TestBugFixes.java | 9 + .../exec/cache/TestCacheSerialization.java | 150 - .../exec/compile/TestClassTransformation.java | 3 +- .../exec/physical/impl/TestOptiqPlans.java | 17 +- .../TestOrderedPartitionExchange.java | 2 + .../apache/drill/exec/server/TestBitRpc.java | 2 +- .../vector/complex/writer/TestJsonReader.java | 4 +- .../java/org/apache/drill/jdbc/DrillCursor.java | 2 +- .../org/apache/drill/exec/proto/BitControl.java | 755 ++++- .../drill/exec/proto/GeneralRPCProtos.java | 882 +----- .../drill/exec/proto/SchemaBitControl.java | 113 + .../exec/proto/SchemaGeneralRPCProtos.java | 132 - .../drill/exec/proto/SchemaUserBitShared.java | 290 +- .../apache/drill/exec/proto/UserBitShared.java | 2982 ++++++++++++++++-- .../drill/exec/proto/beans/DrillPBError.java | 30 +- .../exec/proto/beans/ExceptionWrapper.java | 243 ++ .../exec/proto/beans/InitializeFragments.java | 175 + .../proto/beans/StackTraceElementWrapper.java | 251 ++ protocol/src/main/protobuf/BitControl.proto | 6 +- protocol/src/main/protobuf/GeneralRPC.proto | 8 +- protocol/src/main/protobuf/UserBitShared.proto | 19 +- 66 files changed, 5045 insertions(+), 2912 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/pom.xml ---------------------------------------------------------------------- diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml index d05e4c6..e75f572 100644 --- a/exec/java-exec/pom.xml +++ b/exec/java-exec/pom.xml @@ -306,21 +306,6 @@ <version>2.5.0</version> </dependency> <dependency> - <groupId>com.hazelcast</groupId> - <artifactId>hazelcast</artifactId> - <version>3.1.4</version> - </dependency> - <dependency> - <groupId>org.infinispan</groupId> - <artifactId>infinispan-core</artifactId> - <version>6.0.2.Final</version> - </dependency> - <dependency> - <groupId>org.infinispan</groupId> - <artifactId>infinispan-tree</artifactId> - <version>6.0.2.Final</version> - </dependency> - <dependency> <groupId>org.codehaus.janino</groupId> <artifactId>janino</artifactId> <version>2.7.4</version> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java deleted file mode 100644 index 6627a89..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java +++ /dev/null @@ -1,339 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.cache.infinispan; - -import java.io.IOException; -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.cache.Counter; -import org.apache.drill.exec.cache.DistributedCache; -import org.apache.drill.exec.cache.DistributedMap; -import org.apache.drill.exec.cache.DistributedMultiMap; -import org.apache.drill.exec.cache.SerializationDefinition; -import org.apache.drill.exec.cache.local.LocalCache.LocalCounterImpl; -import org.apache.drill.exec.exception.DrillbitStartupException; -import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.proto.BitControl.FragmentStatus; -import org.apache.drill.exec.proto.BitControl.PlanFragment; -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; -import org.infinispan.Cache; -import org.infinispan.atomic.Delta; -import org.infinispan.atomic.DeltaAware; -import org.infinispan.configuration.cache.CacheMode; -import org.infinispan.configuration.cache.Configuration; -import org.infinispan.configuration.cache.ConfigurationBuilder; -import org.infinispan.configuration.global.GlobalConfigurationBuilder; -import org.infinispan.manager.DefaultCacheManager; -import org.infinispan.manager.EmbeddedCacheManager; -import org.infinispan.remoting.transport.jgroups.JGroupsTransport; -import org.jgroups.blocks.atomic.CounterService; -import org.jgroups.fork.ForkChannel; -import org.jgroups.protocols.COUNTER; -import org.jgroups.protocols.FRAG2; -import org.jgroups.stack.ProtocolStack; - -import com.google.common.collect.Maps; - - -public class ICache implements DistributedCache{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ICache.class); - - private EmbeddedCacheManager manager; - private ForkChannel cacheChannel; - private final CounterService counters; - private final boolean local; - private volatile ConcurrentMap<String, Counter> localCounters; - - public ICache(DrillConfig config, BufferAllocator allocator, boolean local) throws Exception { - String clusterName = config.getString(ExecConstants.SERVICE_NAME); - this.local = local; - - final CacheMode mode = local ? CacheMode.LOCAL : CacheMode.DIST_SYNC; - GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder(); - - if(!local){ - gcb.transport() // - .defaultTransport() // - .clusterName(clusterName); - } - - gcb.serialization() // - .addAdvancedExternalizer(new VAAdvancedExternalizer(allocator)) // - .addAdvancedExternalizer(new JacksonAdvancedExternalizer<>(SerializationDefinition.OPTION, config.getMapper())) // - .addAdvancedExternalizer(new JacksonAdvancedExternalizer<>(SerializationDefinition.STORAGE_PLUGINS, config.getMapper())) // - .addAdvancedExternalizer(new ProtobufAdvancedExternalizer<>(SerializationDefinition.FRAGMENT_STATUS, FragmentStatus.PARSER)) // - .addAdvancedExternalizer(new ProtobufAdvancedExternalizer<>(SerializationDefinition.FRAGMENT_HANDLE, FragmentHandle.PARSER)) // - .addAdvancedExternalizer(new ProtobufAdvancedExternalizer<>(SerializationDefinition.PLAN_FRAGMENT, PlanFragment.PARSER)) // - .build(); - - Configuration c = new ConfigurationBuilder() // - .clustering() // - - .cacheMode(mode) // - .storeAsBinary().enable() // - .build(); - this.manager = new DefaultCacheManager(gcb.build(), c); - - if(!local){ - JGroupsTransport transport = (JGroupsTransport) manager.getCache("first").getAdvancedCache().getRpcManager().getTransport(); - this.cacheChannel = new ForkChannel(transport.getChannel(), "drill-stack", "drill-hijacker", true, ProtocolStack.ABOVE, FRAG2.class, new COUNTER()); - this.counters = new CounterService(this.cacheChannel); - }else{ - this.cacheChannel = null; - this.counters = null; - } - } - - -// @Override -// public <K, V> Map<K, V> getSmallAtomicMap(CacheConfig<K, V> config) { -// Cache<String, ?> cache = manager.getCache("atomic-maps"); -// return AtomicMapLookup.getAtomicMap(cache, config.getName()); -// } - - - @Override - public void close() throws IOException { - manager.stop(); - } - - @Override - public void run() throws DrillbitStartupException { - try { - if(local){ - localCounters = Maps.newConcurrentMap(); - manager.start(); - }else{ - cacheChannel.connect("c1"); - } - - } catch (Exception e) { - throw new DrillbitStartupException("Failure while trying to set up JGroups."); - } - } - - @Override - public <K, V> DistributedMultiMap<K, V> getMultiMap(CacheConfig<K, V> config) { - Cache<K, DeltaList<V>> cache = manager.getCache(config.getName()); - return new IMulti<K, V>(cache, config); - } - - @Override - public <K, V> DistributedMap<K, V> getMap(CacheConfig<K, V> config) { - Cache<K, V> c = manager.getCache(config.getName()); - return new IMap<K, V>(c, config); - } - - @Override - public Counter getCounter(String name) { - if(local){ - Counter c = localCounters.get(name); - if (c == null) { - localCounters.putIfAbsent(name, new LocalCounterImpl()); - return localCounters.get(name); - } else { - return c; - } - - }else{ - return new JGroupsCounter(counters.getOrCreateCounter(name, 0)); - } - - } - - private class JGroupsCounter implements Counter{ - final org.jgroups.blocks.atomic.Counter inner; - - public JGroupsCounter(org.jgroups.blocks.atomic.Counter inner) { - super(); - this.inner = inner; - } - - @Override - public long get() { - return inner.get(); - } - - @Override - public long incrementAndGet() { - return inner.incrementAndGet(); - } - - @Override - public long decrementAndGet() { - return inner.decrementAndGet(); - } - - } - - private class IMap<K, V> implements DistributedMap<K, V>{ - - private Cache<K, V> cache; - private CacheConfig<K, V> config; - - public IMap(Cache<K, V> cache, CacheConfig<K, V> config) { - super(); - this.cache = cache; - this.config = config; - } - - @Override - public Iterable<Entry<K, V>> getLocalEntries() { - return cache.entrySet(); - } - - @Override - public V get(K key) { - return cache.get(key); - } - - @Override - public Future<V> delete(K key) { - return cache.removeAsync(key); - } - - @Override - public Future<V> put(K key, V value) { - return cache.putAsync(key, value); - } - - @Override - public Future<V> putIfAbsent(K key, V value) { - return cache.putIfAbsentAsync(key, value); - } - - @Override - public Future<V> putIfAbsent(K key, V value, long ttl, TimeUnit timeUnit) { - return cache.putIfAbsentAsync(key, value, ttl, timeUnit); - } - - } - - private class IMulti<K, V> implements DistributedMultiMap<K, V>{ - - private Cache<K, DeltaList<V>> cache; - private CacheConfig<K, V> config; - - public IMulti(Cache<K, DeltaList<V>> cache, CacheConfig<K, V> config) { - super(); - this.cache = cache; - this.config = config; - } - - @Override - public Collection<V> get(K key) { - return cache.get(key); - } - - @Override - public Future<Boolean> put(K key, V value) { - return new ICacheFuture(cache.putAsync(key, new DeltaList(value))); - } - - } - - public static class ICacheFuture implements Future<Boolean> { - - Future future; - - public ICacheFuture(Future future) { - this.future = future; - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return future.cancel(mayInterruptIfRunning); - } - - @Override - public boolean isCancelled() { - return future.isCancelled(); - } - - @Override - public boolean isDone() { - return future.isDone(); - } - - @Override - public Boolean get() throws InterruptedException, ExecutionException { - future.get(); - return true; - } - - @Override - public Boolean get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - future.get(timeout, unit); - return true; - } - } - - - - - private static class DeltaList<V> extends LinkedList<V> implements DeltaAware, Delta, List<V> { - - /** The serialVersionUID */ - private static final long serialVersionUID = 2176345973026460708L; - - public DeltaList(Collection<? extends V> c) { - super(c); - } - - public DeltaList(V obj) { - super(); - add(obj); - } - - @Override - public Delta delta() { - return new DeltaList<V>(this); - } - - @Override - public void commit() { - this.clear(); - } - - @SuppressWarnings("unchecked") - @Override - public DeltaAware merge(DeltaAware d) { - List<V> other = null; - if (d != null && d instanceof DeltaList) { - other = (List<V>) d; - for (V e : this) { - other.add(e); - } - return (DeltaAware) other; - } else { - return this; - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/JacksonAdvancedExternalizer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/JacksonAdvancedExternalizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/JacksonAdvancedExternalizer.java deleted file mode 100644 index 55633ab..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/JacksonAdvancedExternalizer.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.cache.infinispan; - -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.Collections; -import java.util.Set; - -import org.apache.drill.exec.cache.SerializationDefinition; -import org.infinispan.commons.marshall.AdvancedExternalizer; - -import com.fasterxml.jackson.databind.ObjectMapper; - -public class JacksonAdvancedExternalizer<T> implements AdvancedExternalizer<T> { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JacksonAdvancedExternalizer.class); - - private final Class<?> clazz; - private final ObjectMapper mapper; - private final int id; - - public JacksonAdvancedExternalizer(SerializationDefinition def, ObjectMapper mapper){ - this.clazz = def.clazz; - this.mapper = mapper; - this.id = def.id; - } - - @Override - public T readObject(ObjectInput in) throws IOException, ClassNotFoundException { - byte[] bytes = new byte[in.readInt()]; - in.readFully(bytes); - return (T) mapper.readValue(bytes, clazz); - } - - @Override - public void writeObject(ObjectOutput out, T object) throws IOException { - byte[] bytes = mapper.writeValueAsBytes(object); - out.writeInt(bytes.length); - out.write(bytes); - } - - @Override - public Integer getId() { - return id; - } - - @Override - public Set<Class<? extends T>> getTypeClasses() { - return (Set<Class<? extends T>>) (Object) Collections.singleton(clazz); - } - - - - -} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ProtobufAdvancedExternalizer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ProtobufAdvancedExternalizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ProtobufAdvancedExternalizer.java deleted file mode 100644 index 7b638ee..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ProtobufAdvancedExternalizer.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.cache.infinispan; - -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.Collections; -import java.util.Set; - -import org.apache.drill.common.util.DataInputInputStream; -import org.apache.drill.exec.cache.SerializationDefinition; -import org.infinispan.commons.marshall.AdvancedExternalizer; - -import com.google.protobuf.Message; -import com.google.protobuf.Parser; - -public class ProtobufAdvancedExternalizer<T extends Message> implements AdvancedExternalizer<T> { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtobufAdvancedExternalizer.class); - - private final Class<?> clazz; - private final int id; - private final Parser<T> parser; - - public ProtobufAdvancedExternalizer(SerializationDefinition def, Parser<T> parser){ - this.clazz = def.clazz; - this.parser = parser; - this.id = def.id; - } - - @Override - public T readObject(ObjectInput in) throws IOException, ClassNotFoundException { - return parser.parseFrom(DataInputInputStream.constructInputStream(in)); - } - - @Override - public void writeObject(ObjectOutput out, T object) throws IOException { - out.write(object.toByteArray()); - } - - @Override - public Integer getId() { - return id; - } - - @Override - public Set<Class<? extends T>> getTypeClasses() { - return (Set<Class<? extends T>>) (Object) Collections.singleton(clazz); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/VAAdvancedExternalizer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/VAAdvancedExternalizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/VAAdvancedExternalizer.java deleted file mode 100644 index f072628..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/VAAdvancedExternalizer.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.cache.infinispan; - -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.Set; - -import org.apache.drill.exec.cache.CachedVectorContainer; -import org.apache.drill.exec.memory.BufferAllocator; -import org.infinispan.commons.marshall.AdvancedExternalizer; - -import com.google.common.collect.ImmutableSet; - -public class VAAdvancedExternalizer implements AdvancedExternalizer<CachedVectorContainer> { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VAAdvancedExternalizer.class); - - private BufferAllocator allocator; - - - public VAAdvancedExternalizer(BufferAllocator allocator) { - super(); - this.allocator = allocator; - } - - static final Set<Class<? extends CachedVectorContainer>> CLASSES = // - (Set<Class<? extends CachedVectorContainer>>) // - (Object) ImmutableSet.of(CachedVectorContainer.class); - - @Override - public CachedVectorContainer readObject(ObjectInput in) throws IOException, ClassNotFoundException { - int length = in.readInt(); - byte[] b = new byte[length]; - in.read(b); - CachedVectorContainer va = new CachedVectorContainer(b, allocator); - return va; - } - - @Override - public void writeObject(ObjectOutput out, CachedVectorContainer va) throws IOException { - out.writeInt(va.getData().length); - out.write(va.getData()); - } - - @Override - public Integer getId() { - // magic number for this class, assume drill uses 3001-3100. - return 3001; - } - - @Override - public Set<Class<? extends CachedVectorContainer>> getTypeClasses() { - return CLASSES; - } -} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ZookeeperCacheStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ZookeeperCacheStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ZookeeperCacheStore.java deleted file mode 100644 index 46d4eca..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ZookeeperCacheStore.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.cache.infinispan; - -import org.infinispan.marshall.core.MarshalledEntry; -import org.infinispan.persistence.spi.ExternalStore; -import org.infinispan.persistence.spi.InitializationContext; - -/** - * Stores the cached objects in zookeeper. Objects are stored in /start/cache_name/key_name = data - * @param <K> - * @param <V> - */ -public class ZookeeperCacheStore<K, V> implements ExternalStore<K, V>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZookeeperCacheStore.class); - - private String cacheName; - - @Override - public void init(InitializationContext ctx) { - ctx.getConfiguration(); - - } - - @Override - public MarshalledEntry<K, V> load(K key) { - return null; - } - - @Override - public boolean contains(K key) { - return false; - } - - @Override - public void start() { - } - - @Override - public void stop() { - } - - @Override - public void write(MarshalledEntry<K, V> entry) { - } - - @Override - public boolean delete(K key) { - return false; - } -} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java deleted file mode 100644 index 99ead1c..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java +++ /dev/null @@ -1,407 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.cache.local; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; -import java.util.Arrays; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.common.util.DataOutputOutputStream; -import org.apache.drill.exec.cache.Counter; -import org.apache.drill.exec.cache.DistributedCache; -import org.apache.drill.exec.cache.DistributedMap; -import org.apache.drill.exec.cache.DistributedMultiMap; -import org.apache.drill.exec.cache.DrillSerializable; -import org.apache.drill.exec.exception.DrillbitStartupException; -import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.memory.TopLevelAllocator; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Charsets; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.ListMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Multimaps; -import com.google.common.io.ByteArrayDataOutput; -import com.google.common.io.ByteStreams; -import com.google.protobuf.Message; -import com.google.protobuf.Parser; - -public class LocalCache implements DistributedCache { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalCache.class); - - private volatile ConcurrentMap<CacheConfig<?, ?>, DistributedMap<?, ?>> maps; - private volatile ConcurrentMap<CacheConfig<?, ?>, DistributedMultiMap<?, ?>> multiMaps; - private volatile ConcurrentMap<String, Counter> counters; - private static final BufferAllocator allocator = new TopLevelAllocator(DrillConfig.create()); - - private static final ObjectMapper mapper = DrillConfig.create().getMapper(); - - @Override - public void close() throws IOException { - - } - - @Override - public void run() throws DrillbitStartupException { - maps = Maps.newConcurrentMap(); - multiMaps = Maps.newConcurrentMap(); - counters = Maps.newConcurrentMap(); - } - - @Override - public <K, V> DistributedMultiMap<K, V> getMultiMap(CacheConfig<K, V> config) { - DistributedMultiMap<K, V> mmap = (DistributedMultiMap<K, V>) multiMaps.get(config); - if (mmap == null) { - multiMaps.putIfAbsent(config, new LocalDistributedMultiMapImpl<K, V>(config)); - return (DistributedMultiMap<K, V>) multiMaps.get(config); - } else { - return mmap; - } - } - - @Override - public <K, V> DistributedMap<K, V> getMap(CacheConfig<K, V> config) { - DistributedMap<K, V> m = (DistributedMap<K, V>) maps.get(config); - if (m == null) { - maps.putIfAbsent(config, new LocalDistributedMapImpl<K, V>(config)); - return (DistributedMap<K, V>) maps.get(config); - } else { - return m; - } - } - - @Override - public Counter getCounter(String name) { - Counter c = counters.get(name); - if (c == null) { - counters.putIfAbsent(name, new LocalCounterImpl()); - return counters.get(name); - } else { - return c; - } - } - - private static BytesHolder serialize(Object obj, SerializationMode mode) { - if (obj instanceof String) { - return new BytesHolder( ((String)obj).getBytes(Charsets.UTF_8)); - } - try{ - switch (mode) { - case DRILL_SERIALIZIABLE: { - ByteArrayDataOutput out = ByteStreams.newDataOutput(); - OutputStream outputStream = DataOutputOutputStream.constructOutputStream(out); - ((DrillSerializable)obj).writeToStream(outputStream); - outputStream.flush(); - return new BytesHolder(out.toByteArray()); - } - - case JACKSON: { - ByteArrayDataOutput out = ByteStreams.newDataOutput(); - out.write(mapper.writeValueAsBytes(obj)); - return new BytesHolder(out.toByteArray()); - } - - case PROTOBUF: - return new BytesHolder(( (Message) obj).toByteArray()); - - } - } catch (Exception e) { - throw new RuntimeException(e); - } - - throw new UnsupportedOperationException(); - } - - private static <V> V deserialize(BytesHolder b, SerializationMode mode, Class<V> clazz) { - byte[] bytes = b.bytes; - try { - - if (clazz == String.class) { - return (V) new String(bytes, Charsets.UTF_8); - } - - switch (mode) { - case DRILL_SERIALIZIABLE: { - InputStream inputStream = new ByteArrayInputStream(bytes); - V obj = clazz.getConstructor(BufferAllocator.class).newInstance(allocator); - ((DrillSerializable) obj).readFromStream(inputStream); - return obj; - } - - case JACKSON: { - return (V) mapper.readValue(bytes, clazz); - } - - case PROTOBUF: { - Parser<V> parser = null; - for (Field f : clazz.getFields()) { - if (f.getName().equals("PARSER") && Modifier.isStatic(f.getModifiers())) { - parser = (Parser<V>) f.get(null); - } - } - if (parser == null) { - throw new UnsupportedOperationException(String.format("Unable to find parser for class %s.", clazz.getName())); - } - InputStream inputStream = new ByteArrayInputStream(bytes); - return parser.parseFrom(inputStream); - } - - } - } catch (Exception e) { - throw new RuntimeException(e); - } - - throw new UnsupportedOperationException(); - } - - private static class BytesHolder { - final byte[] bytes; - public BytesHolder(byte[] bytes) { - this.bytes = bytes; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + Arrays.hashCode(bytes); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - BytesHolder other = (BytesHolder) obj; - if (!Arrays.equals(bytes, other.bytes)) { - return false; - } - return true; - } - - } - - static class LocalDistributedMultiMapImpl<K, V> implements DistributedMultiMap<K, V> { - private ListMultimap<BytesHolder, BytesHolder> mmap; - private CacheConfig<K, V> config; - - public LocalDistributedMultiMapImpl(CacheConfig<K, V> config) { - ListMultimap<BytesHolder, BytesHolder> innerMap = ArrayListMultimap.create(); - mmap = Multimaps.synchronizedListMultimap(innerMap); - this.config = config; - } - - @Override - public Collection<V> get(K key) { - List<V> list = Lists.newArrayList(); - for (BytesHolder o : mmap.get(serialize(key, config.getMode()))) { - list.add(deserialize(o, config.getMode(), config.getValueClass())); - } - return list; - } - - @Override - public Future<Boolean> put(K key, V value) { - mmap.put(serialize(key, config.getMode()), serialize(value, config.getMode())); - return new LocalCacheFuture(true); - } - } - - public static class LocalCacheFuture<V> implements Future<V> { - - V value; - - public LocalCacheFuture(V value) { - this.value = value; - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return false; - } - - @Override - public boolean isCancelled() { - return false; - } - - @Override - public boolean isDone() { - return true; - } - - @Override - public V get() throws InterruptedException, ExecutionException { - return value; - } - - @Override - public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - return value; - } - } - - public static class LocalDistributedMapImpl<K, V> implements DistributedMap<K, V> { - protected ConcurrentMap<BytesHolder, BytesHolder> m; - protected CacheConfig<K, V> config; - - public LocalDistributedMapImpl(CacheConfig<K, V> config) { - m = Maps.newConcurrentMap(); - this.config = config; - } - - @Override - public V get(K key) { - BytesHolder b = m.get(serialize(key, config.getMode())); - if (b == null) { - return null; - } - return (V) deserialize(b, config.getMode(), config.getValueClass()); - } - - @Override - public Iterable<Entry<K, V>> getLocalEntries() { - return new Iterable<Entry<K, V>>() { - @Override - public Iterator<Entry<K, V>> iterator() { - return new DeserializingTransformer(m.entrySet().iterator()); - } - }; - - } - - @Override - public Future<V> put(K key, V value) { - m.put(serialize(key, config.getMode()), serialize(value, config.getMode())); - return new LocalCacheFuture(value); - } - - - @Override - public Future<V> putIfAbsent(K key, V value) { - m.putIfAbsent(serialize(key, config.getMode()), serialize(value, config.getMode())); - return new LocalCacheFuture(value); - } - - @Override - public Future<V> delete(K key) { - V value = get(key); - m.remove(serialize(key, config.getMode())); - return new LocalCacheFuture(value); - } - - @Override - public Future<V> putIfAbsent(K key, V value, long ttl, TimeUnit timeUnit) { - m.putIfAbsent(serialize(key, config.getMode()), serialize(value, config.getMode())); - logger.warn("Expiration not implemented in local map cache"); - return new LocalCacheFuture<V>(value); - } - - private class DeserializingTransformer implements Iterator<Map.Entry<K, V>> { - private Iterator<Map.Entry<BytesHolder, BytesHolder>> inner; - - public DeserializingTransformer(Iterator<Entry<BytesHolder, BytesHolder>> inner) { - super(); - this.inner = inner; - } - - @Override - public boolean hasNext() { - return inner.hasNext(); - } - - @Override - public Entry<K, V> next() { - return newEntry(inner.next()); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - public Entry<K, V> newEntry(final Entry<BytesHolder, BytesHolder> input) { - return new Map.Entry<K, V>() { - - @Override - public K getKey() { - return deserialize(input.getKey(), config.getMode(), config.getKeyClass()); - } - - @Override - public V getValue() { - return deserialize(input.getValue(), config.getMode(), config.getValueClass()); - } - - @Override - public V setValue(V value) { - throw new UnsupportedOperationException(); - } - - }; - } - - } - - } - - public static class LocalCounterImpl implements Counter { - private AtomicLong al = new AtomicLong(); - - @Override - public long get() { - return al.get(); - } - - @Override - public long incrementAndGet() { - return al.incrementAndGet(); - } - - @Override - public long decrementAndGet() { - return al.decrementAndGet(); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java index 2d69ca3..52d9e34 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java @@ -24,7 +24,6 @@ import java.util.Set; import org.apache.drill.common.util.DrillStringUtils; import org.apache.drill.common.util.FileUtils; -import org.apache.drill.exec.cache.DistributedCache; import org.apache.drill.exec.compile.MergeAdapter.MergedClassResult; import org.apache.drill.exec.exception.ClassTransformationException; import org.codehaus.commons.compiler.CompileException; @@ -40,10 +39,8 @@ public class ClassTransformer { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ClassTransformer.class); private final ByteCodeLoader byteCodeLoader = new ByteCodeLoader(); - private final DistributedCache cache; - public ClassTransformer(DistributedCache cache) { - this.cache = cache; + public ClassTransformer() { } public static class ClassSet{ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java index a9b0c61..5628ea3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java @@ -21,8 +21,6 @@ import java.io.IOException; import java.util.concurrent.ExecutionException; import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.exec.cache.DistributedCache; -import org.apache.drill.exec.cache.local.LocalCache; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.server.options.OptionManager; @@ -37,14 +35,12 @@ public class CodeCompiler { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CodeCompiler.class); private final ClassTransformer transformer; - private final DistributedCache distributedCache; private final LoadingCache<CodeGenerator<?>, GeneratedClassEntry> cache; private final DrillConfig config; private final OptionManager systemOptionManager; - public CodeCompiler(DrillConfig config, DistributedCache distributedCache, OptionManager systemOptionManager){ - this.transformer = new ClassTransformer(distributedCache); - this.distributedCache = distributedCache; + public CodeCompiler(DrillConfig config, OptionManager systemOptionManager){ + this.transformer = new ClassTransformer(); this.cache = CacheBuilder // .newBuilder() // .maximumSize(1000) // @@ -87,6 +83,6 @@ public class CodeCompiler { } public static CodeCompiler getTestCompiler(DrillConfig c) throws IOException{ - return new CodeCompiler(c, new LocalCache(), new SystemOptionManager(c, new LocalPStoreProvider(c)).init()); + return new CodeCompiler(c, new SystemOptionManager(c, new LocalPStoreProvider(c)).init()); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java index 0564c1a..9b78c1d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java @@ -68,7 +68,6 @@ public class FragmentContext implements Closeable { private final FragmentStats stats; private final FunctionImplementationRegistry funcRegistry; private final QueryClassLoader loader; - private final ClassTransformer transformer; private final BufferAllocator allocator; private final PlanFragment fragment; private List<Thread> daemonThreads = Lists.newLinkedList(); @@ -85,8 +84,7 @@ public class FragmentContext implements Closeable { public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection, FunctionImplementationRegistry funcRegistry) throws OutOfMemoryException, ExecutionSetupException { - this.transformer = new ClassTransformer(dbContext.getCache()); - this.stats = new FragmentStats(dbContext.getMetrics()); + this.stats = new FragmentStats(dbContext.getMetrics(), fragment.getAssignment()); this.context = dbContext; this.connection = connection; this.fragment = fragment; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java index 22872f9..4431235 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.ops; import java.util.List; import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile; import com.codahale.metrics.MetricRegistry; @@ -28,19 +29,19 @@ import com.google.common.collect.Lists; public class FragmentStats { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentStats.class); - private List<OperatorStats> operators = Lists.newArrayList(); private final long startTime; + private final DrillbitEndpoint endpoint; - public FragmentStats(MetricRegistry metrics) { + public FragmentStats(MetricRegistry metrics, DrillbitEndpoint endpoint) { this.startTime = System.currentTimeMillis(); + this.endpoint = endpoint; } public void addMetricsToStatus(MinorFragmentProfile.Builder prfB) { - prfB.setStartTime(startTime); prfB.setEndTime(System.currentTimeMillis()); - + prfB.setEndpoint(endpoint); for(OperatorStats o : operators){ prfB.addOperatorProfile(o.getProfile()); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java index 1ad144d..4b290a5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java @@ -23,7 +23,6 @@ import net.hydromatic.optiq.SchemaPlus; import net.hydromatic.optiq.jdbc.SimpleOptiqSchema; import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.exec.cache.DistributedCache; import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; import org.apache.drill.exec.planner.PhysicalPlanReader; import org.apache.drill.exec.planner.physical.PlannerSettings; @@ -112,11 +111,6 @@ public class QueryContext{ return drillbitContext.getStorage(); } - - public DistributedCache getCache(){ - return drillbitContext.getCache(); - } - public Collection<DrillbitEndpoint> getActiveEndpoints(){ return drillbitContext.getBits(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java index bd15ac9..868eb6e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java @@ -104,7 +104,7 @@ public class ScreenCreator implements RootCreator<Screen>{ .setQueryId(context.getHandle().getQueryId()) // .setRowCount(0) // .setQueryState(QueryState.FAILED) - .addError(ErrorHelper.logAndConvertError(context.getIdentity(), "Screen received stop request sent.", + .addError(ErrorHelper.logAndConvertMessageError(context.getIdentity(), "Query stopeed.", context.getFailureCause(), logger, verbose)) .setDef(RecordBatchDef.getDefaultInstance()) // .setIsLastChunk(true) // @@ -203,7 +203,7 @@ public class ScreenCreator implements RootCreator<Screen>{ sendCount.decrement(); logger.error("Failure while sending data to user.", ex); boolean verbose = context.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val; - ErrorHelper.logAndConvertError(context.getIdentity(), "Failure while sending fragment to client.", ex, logger, + ErrorHelper.logAndConvertMessageError(context.getIdentity(), "Failure while sending fragment to client.", ex, logger, verbose); ok = false; this.ex = ex; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java index c594e70..3c8e551 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java @@ -185,9 +185,7 @@ public class BroadcastSenderRootExec extends BaseRootExec { public void failed(RpcException ex) { sendCount.decrement(); logger.error("Failure while sending data to user.", ex); - boolean verbose = context.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val; - ErrorHelper.logAndConvertError(context.getIdentity(), "Failure while sending fragment to client.", ex, logger, - verbose); + ErrorHelper.logAndConvertError(context.getIdentity(), "Failure while sending fragment to client.", ex, logger); ok = false; this.ex = ex; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java index aecf363..a062074 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java @@ -141,7 +141,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart this.samplingFactor = pop.getSamplingFactor(); this.completionFactor = pop.getCompletionFactor(); - DistributedCache cache = context.getDrillbitContext().getCache(); + DistributedCache cache = null; this.mmap = cache.getMultiMap(MULTI_CACHE_CONFIG); this.tableMap = cache.getMap(SINGLE_CACHE_CONFIG); Preconditions.checkNotNull(tableMap); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/StatusHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/StatusHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/StatusHandler.java index 469140c..5e21878 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/StatusHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/StatusHandler.java @@ -20,7 +20,6 @@ package org.apache.drill.exec.physical.impl.partitionsender; import io.netty.buffer.ByteBuf; -import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.SendingAccountor; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; @@ -50,9 +49,7 @@ public class StatusHandler extends BaseRpcOutcomeListener<Ack> { public void failed(RpcException ex) { sendCount.decrement(); logger.error("Failure while sending data to user.", ex); - boolean verbose = context.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val; - ErrorHelper.logAndConvertError(context.getIdentity(), "Failure while sending fragment to client.", ex, logger, - verbose); + ErrorHelper.logAndConvertError(context.getIdentity(), "Failure while sending fragment to client.", ex, logger); ok = false; this.ex = ex; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java index 10ae6e3..9b071ad 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java @@ -19,6 +19,8 @@ package org.apache.drill.exec.rpc; import io.netty.buffer.ByteBuf; +import org.apache.drill.exec.proto.UserBitShared.DrillPBError; + public class BaseRpcOutcomeListener<T> implements RpcOutcomeListener<T> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseRpcOutcomeListener.class); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java index 8f43b06..0016d6a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java @@ -23,7 +23,7 @@ import io.netty.channel.ChannelFuture; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import org.apache.drill.exec.proto.GeneralRPCProtos.RpcFailure; +import org.apache.drill.exec.proto.UserBitShared.DrillPBError; /** * Manages the creation of rpc futures for a particular socket. @@ -146,7 +146,7 @@ public class CoordinationQueue { return crpc; } - public void updateFailedFuture(int coordinationId, RpcFailure failure) { + public void updateFailedFuture(int coordinationId, DrillPBError failure) { // logger.debug("Updating failed future."); try { RpcOutcome<?> rpc = removeFromMap(coordinationId); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java index edad63e..5eda350 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java @@ -34,8 +34,15 @@ public class OutboundRpcMessage extends RpcMessage { final MessageLite pBody; public ByteBuf[] dBodies; + + public OutboundRpcMessage(RpcMode mode, EnumLite rpcType, int coordinationId, MessageLite pBody, ByteBuf... dBodies) { - super(mode, rpcType.getNumber(), coordinationId); + this(mode, rpcType.getNumber(), coordinationId, pBody, dBodies); + } + + + OutboundRpcMessage(RpcMode mode, int rpcTypeNumber, int coordinationId, MessageLite pBody, ByteBuf... dBodies) { + super(mode, rpcTypeNumber, coordinationId); this.pBody = pBody; // Netty doesn't traditionally release the reference on an unreadable buffer. However, we need to so that if we send a empty or unwritable buffer, we still release. otherwise we get weird memory leaks when sending empty vectors. http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteRpcException.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteRpcException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteRpcException.java index d75e902..14ea873 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteRpcException.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteRpcException.java @@ -17,22 +17,27 @@ */ package org.apache.drill.exec.rpc; -import org.apache.drill.exec.proto.GeneralRPCProtos.RpcFailure; +import org.apache.drill.exec.proto.UserBitShared.DrillPBError; +import org.apache.drill.exec.work.ErrorHelper; public class RemoteRpcException extends RpcException{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteRpcException.class); - private final RpcFailure failure; + private final DrillPBError failure; - public RemoteRpcException(RpcFailure failure) { - super(String.format("Failure while executing rpc. Remote failure message: [%s]. Error Code: [%d]. Remote Error Id: [%d]", failure.getShortError(), failure.getErrorId(), failure.getErrorCode())); + public RemoteRpcException(DrillPBError failure) { + super(ErrorHelper.getErrorMessage(failure, false)); this.failure = failure; } - public RpcFailure getFailure() { + @Override + public DrillPBError getRemoteError() { return failure; } - + @Override + public boolean isRemote() { + return true; + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java index 918ca0b..96c9911 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java @@ -30,8 +30,10 @@ import java.io.Closeable; import java.util.Arrays; import java.util.List; -import org.apache.drill.exec.proto.GeneralRPCProtos.RpcFailure; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode; +import org.apache.drill.exec.proto.UserBitShared.DrillPBError; +import org.apache.drill.exec.work.ErrorHelper; import com.google.common.base.Preconditions; import com.google.protobuf.Internal.EnumLite; @@ -188,7 +190,16 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp case REQUEST: { // handle message and ack. ResponseSender sender = new ResponseSenderImpl(connection, msg.coordinationId); - handle(connection, msg.rpcType, msg.pBody, msg.dBody, sender); + try { + handle(connection, msg.rpcType, msg.pBody, msg.dBody, sender); + } catch(UserRpcException e){ + DrillPBError error = ErrorHelper.logAndConvertError(e.getEndpoint(), e.getUserMessage(), e, logger); + OutboundRpcMessage outMessage = new OutboundRpcMessage(RpcMode.RESPONSE_FAILURE, 0, msg.coordinationId, error); + if (RpcConstants.EXTRA_DEBUGGING) { + logger.debug("Adding message to outbound buffer. {}", outMessage); + } + connection.getChannel().writeAndFlush(outMessage); + } msg.release(); // we release our ownership. Handle could have taken over ownership. break; } @@ -212,7 +223,7 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp break; case RESPONSE_FAILURE: - RpcFailure failure = RpcFailure.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes())); + DrillPBError failure = DrillPBError.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes())); queue.updateFailedFuture(msg.coordinationId, failure); msg.release(); if (RpcConstants.EXTRA_DEBUGGING) { @@ -227,40 +238,6 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp } -// private class Listener implements GenericFutureListener<ChannelFuture> { -// -// private int coordinationId; -// private Class<?> clazz; -// -// public Listener(int coordinationId, Class<?> clazz) { -// this.coordinationId = coordinationId; -// this.clazz = clazz; -// } -// -// @Override -// public void operationComplete(ChannelFuture channelFuture) throws Exception { -// // logger.debug("Completed channel write."); -// -// if (channelFuture.isCancelled()) { -// RpcOutcome<?> rpcFuture = queue.getFuture(-1, coordinationId, clazz); -// rpcFuture.setException(new CancellationException("Socket operation was canceled.")); -// } else if (!channelFuture.isSuccess()) { -// try { -// channelFuture.get(); -// throw new IllegalStateException("Future was described as completed and not succesful but did not throw an exception."); -// } catch (Exception e) { -// logger.error("Error occurred during Rpc", e); -// RpcOutcome<?> rpcFuture = queue.getFuture(-1, coordinationId, clazz); -// rpcFuture.setException(e); -// } -// } else { -// // send was successful. No need to modify DrillRpcFuture. -// return; -// } -// } -// -// } - public static <T> T get(ByteBuf pBody, Parser<T> parser) throws RpcException{ try { ByteBufInputStream is = new ByteBufInputStream(pBody); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java index 34256f3..f9da6f1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java @@ -58,6 +58,7 @@ class RpcEncoder extends MessageToMessageEncoder<OutboundRpcMessage>{ if (!ctx.channel().isOpen()) { //output.add(ctx.alloc().buffer(0)); logger.debug("Channel closed, skipping encode."); + msg.release(); return; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java index eb870b3..a6d0e8e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java @@ -21,6 +21,7 @@ import java.util.concurrent.ExecutionException; import org.apache.drill.common.exceptions.DrillIOException; import org.apache.drill.common.util.DrillStringUtils; +import org.apache.drill.exec.proto.UserBitShared.DrillPBError; /** * Parent class for all rpc exceptions. @@ -66,4 +67,11 @@ public class RpcException extends DrillIOException{ return new RpcException(message, t); } + public boolean isRemote(){ + return false; + } + + public DrillPBError getRemoteError(){ + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserRpcException.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserRpcException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserRpcException.java new file mode 100644 index 0000000..1d2cc1a --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserRpcException.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.rpc; + +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; + +public class UserRpcException extends RpcException { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserRpcException.class); + + private final String message; + private final DrillbitEndpoint endpoint; + private final Throwable t; + + public UserRpcException(DrillbitEndpoint endpoint, String message, Throwable t) { + super(t); + this.message = message; + this.endpoint = endpoint; + this.t = t; + } + + public String getUserMessage() { + return message; + } + + public DrillbitEndpoint getEndpoint() { + return endpoint; + } + + public Throwable getUserException() { + return t; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java index 31fbe7b..1308c37 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java @@ -21,7 +21,7 @@ package org.apache.drill.exec.rpc.control; import org.apache.drill.exec.proto.BitControl.BitControlHandshake; import org.apache.drill.exec.proto.BitControl.FinishedReceiver; import org.apache.drill.exec.proto.BitControl.FragmentStatus; -import org.apache.drill.exec.proto.BitControl.PlanFragment; +import org.apache.drill.exec.proto.BitControl.InitializeFragments; import org.apache.drill.exec.proto.BitControl.RpcType; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; @@ -36,14 +36,14 @@ public class ControlRpcConfig { public static RpcConfig MAPPING = RpcConfig.newBuilder("BIT-CONTROL-RPC-MAPPING") // .add(RpcType.HANDSHAKE, BitControlHandshake.class, RpcType.HANDSHAKE, BitControlHandshake.class) - .add(RpcType.REQ_INIATILIZE_FRAGMENT, PlanFragment.class, RpcType.ACK, Ack.class) + .add(RpcType.REQ_INIATILIZE_FRAGMENTS, InitializeFragments.class, RpcType.ACK, Ack.class) .add(RpcType.REQ_CANCEL_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class) .add(RpcType.REQ_RECEIVER_FINISHED, FinishedReceiver.class, RpcType.ACK, Ack.class) .add(RpcType.REQ_FRAGMENT_STATUS, FragmentStatus.class, RpcType.ACK, Ack.class) .add(RpcType.REQ_QUERY_STATUS, QueryId.class, RpcType.RESP_QUERY_STATUS, QueryProfile.class) .build(); - public static int RPC_VERSION = 2; + public static int RPC_VERSION = 3; public static final Response OK = new Response(RpcType.ACK, Acks.OK); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java index d035c10..461cd8a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java @@ -17,8 +17,11 @@ */ package org.apache.drill.exec.rpc.control; +import java.util.Collection; + import org.apache.drill.exec.proto.BitControl.FinishedReceiver; import org.apache.drill.exec.proto.BitControl.FragmentStatus; +import org.apache.drill.exec.proto.BitControl.InitializeFragments; import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.BitControl.RpcType; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; @@ -47,8 +50,8 @@ public class ControlTunnel { return manager.getEndpoint(); } - public void sendFragment(RpcOutcomeListener<Ack> outcomeListener, PlanFragment fragment){ - SendFragment b = new SendFragment(outcomeListener, fragment); + public void sendFragments(RpcOutcomeListener<Ack> outcomeListener, InitializeFragments fragments){ + SendFragment b = new SendFragment(outcomeListener, fragments); manager.runCommand(b); } @@ -121,16 +124,16 @@ public class ControlTunnel { } public static class SendFragment extends ListeningCommand<Ack, ControlConnection> { - final PlanFragment fragment; + final InitializeFragments fragments; - public SendFragment(RpcOutcomeListener<Ack> listener, PlanFragment fragment) { + public SendFragment(RpcOutcomeListener<Ack> listener, InitializeFragments fragments) { super(listener); - this.fragment = fragment; + this.fragments = fragments; } @Override public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) { - connection.send(outcomeListener, RpcType.REQ_INIATILIZE_FRAGMENT, fragment, Ack.class); + connection.send(outcomeListener, RpcType.REQ_INIATILIZE_FRAGMENTS, fragments, Ack.class); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java index 45acd13..23380ff 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java @@ -28,6 +28,7 @@ import org.apache.drill.exec.proto.BitControl.FragmentStatus; import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.work.WorkManager.WorkerBee; import org.apache.drill.exec.work.foreman.Foreman; @@ -82,78 +83,73 @@ public class WorkEventBus { } } - public void setRootFragmentManager(RootFragmentManager fragmentManager) { - FragmentManager old = managers.putIfAbsent(fragmentManager.getHandle(), fragmentManager); - if (old != null) { - throw new IllegalStateException( - "Tried to set fragment manager when has already been set for the provided fragment handle."); + public void setFragmentManager(FragmentManager fragmentManager) { + logger.debug("Manager created: {}", QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle())); + + synchronized (managers) { + FragmentManager old = managers.putIfAbsent(fragmentManager.getHandle(), fragmentManager); + managers.notifyAll(); + if (old != null) { + throw new IllegalStateException( + "Tried to set fragment manager when has already been set for the provided fragment handle."); + } } } - public FragmentManager getFragmentManager(FragmentHandle handle) { + public FragmentManager getFragmentManagerIfExists(FragmentHandle handle){ return managers.get(handle); - } - public void cancelFragment(FragmentHandle handle) { - cancelledFragments.put(handle, null); - removeFragmentManager(handle); } - public FragmentManager getOrCreateFragmentManager(FragmentHandle handle) throws FragmentSetupException{ + public FragmentManager getFragmentManager(FragmentHandle handle) throws FragmentSetupException { + + // check if this was a recently canceled fragment. If so, throw away message. if (cancelledFragments.asMap().containsKey(handle)) { logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle); return null; } - // We need to synchronize this part. Without that, multiple bit servers will be creating a Fragment manager and the - // corresponding FragmentContext object. Each FragmentContext object registers with the TopLevelAllocator so that - // the allocator can manage fragment resources across all fragments. So we need to make sure only one - // FragmentManager is actually created and used for a given FragmentHandle. - FragmentManager newManager; - FragmentManager manager; - - manager = managers.get(handle); - if (manager != null) { - return manager; + + // chm manages concurrency better then everyone fighting for the same lock so we'll do a double check. + FragmentManager m = managers.get(handle); + if(m != null){ + return m; } - if (logger.isDebugEnabled()) { - String fragHandles = "Looking for Fragment handle: " + handle.toString() + "(Hash Code:" + handle.hashCode() - + ")\n Fragment Handles in Fragment manager: "; - for (FragmentHandle h : managers.keySet()) { - fragHandles += h.toString() + "\n"; - fragHandles += "[Hash Code: " + h.hashCode() + "]\n"; + + logger.debug("Fragment was requested but no manager exists. Waiting for manager for fragment: {}", QueryIdHelper.getQueryIdentifier(handle)); + try{ + // We need to handle the race condition between the fragments being sent to leaf nodes and intermediate nodes. It is possible that a leaf node would send a data batch to a intermediate node before the intermediate node received the associated plan. As such, we will wait here for a bit to see if the appropriate fragment shows up. + long expire = System.currentTimeMillis() + 30*1000; + synchronized(managers){ + + // we loop because we may be woken up by some other, unrelated manager insertion. + while(true){ + m = managers.get(handle); + if(m != null) { + return m; + } + long timeToWait = expire - System.currentTimeMillis(); + if(timeToWait <= 0){ + break; + } + + managers.wait(timeToWait); } - logger.debug(fragHandles); - } - DistributedMap<FragmentHandle, PlanFragment> planCache = bee.getContext().getCache().getMap(Foreman.FRAGMENT_CACHE); -// for (Map.Entry<FragmentHandle, PlanFragment> e : planCache.getLocalEntries()) { -// logger.debug("Key: {}", e.getKey()); -// logger.debug("Value: {}", e.getValue()); -// } - PlanFragment fragment = bee.getContext().getCache().getMap(Foreman.FRAGMENT_CACHE).get(handle); - - if (fragment == null) { - throw new FragmentSetupException("Received batch where fragment was not in cache."); + + throw new FragmentSetupException("Failed to receive plan fragment that was required for id: " + QueryIdHelper.getQueryIdentifier(handle)); } - logger.debug("Allocating new non root fragment manager: " + handle.toString()); - newManager = new NonRootFragmentManager(fragment, bee); - logger.debug("Allocated new non root fragment manager: " + handle.toString()); - - manager = managers.putIfAbsent(fragment.getHandle(), newManager); - if (manager == null) { - // we added a handler, inform the bee that we did so. This way, the foreman can track status. - bee.addFragmentPendingRemote(newManager); - manager = newManager; - }else{ - // prevent a leak of the initial allocation. - // Also the fragment context is registered with the top level allocator. - // This will unregister the unused fragment context as well. - newManager.getFragmentContext().close(); + }catch(InterruptedException e){ + throw new FragmentSetupException("Interrupted while waiting to receive plan fragment.."); } + } - return manager; + public void cancelFragment(FragmentHandle handle) { + logger.debug("Fragment canceled: {}", QueryIdHelper.getQueryIdentifier(handle)); + cancelledFragments.put(handle, null); + removeFragmentManager(handle); } public void removeFragmentManager(FragmentHandle handle) { + logger.debug("Removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle)); managers.remove(handle); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java index 2c6e02c..1f261bc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java @@ -108,7 +108,7 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> { FragmentHandle handle = fragmentBatch.getHandle(); try { - FragmentManager manager = workBus.getOrCreateFragmentManager(fragmentBatch.getHandle()); + FragmentManager manager = workBus.getFragmentManager(fragmentBatch.getHandle()); if (manager == null) { if (body != null) { body.release(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java index 2125166..e8f175b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java @@ -21,8 +21,6 @@ import java.io.Closeable; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.cache.DistributedCache; -import org.apache.drill.exec.cache.infinispan.ICache; import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.coord.ClusterCoordinator.RegistrationHandle; import org.apache.drill.exec.coord.zk.ZKClusterCoordinator; @@ -84,7 +82,6 @@ public class Drillbit implements Closeable{ final ClusterCoordinator coord; final ServiceEngine engine; - final DistributedCache cache; final PStoreProvider storeProvider; final WorkManager manager; final BootStrapContext context; @@ -107,13 +104,11 @@ public class Drillbit implements Closeable{ if(serviceSet != null) { this.coord = serviceSet.getCoordinator(); - this.cache = serviceSet.getCache(); this.storeProvider = new LocalPStoreProvider(config); } else { Runtime.getRuntime().addShutdownHook(new ShutdownThread(config)); this.coord = new ZKClusterCoordinator(config); this.storeProvider = new PStoreRegistry(this.coord, config).newPStoreProvider(); - this.cache = new ICache(config, context.getAllocator(), false); } } @@ -148,8 +143,7 @@ public class Drillbit implements Closeable{ coord.start(10000); storeProvider.start(); DrillbitEndpoint md = engine.start(); - manager.start(md, cache, engine.getController(), engine.getDataConnectionCreator(), coord, storeProvider); - cache.run(); + manager.start(md, engine.getController(), engine.getDataConnectionCreator(), coord, storeProvider); manager.getContext().getStorage().init(); manager.getContext().getOptionManager().init(); handle = coord.register(md); @@ -173,11 +167,6 @@ public class Drillbit implements Closeable{ } catch (Exception e) { logger.warn("Failure while shutting down embedded jetty server."); } - try { - cache.close(); - } catch (Exception e) { - e.printStackTrace(); - } Closeables.closeQuietly(engine); Closeables.closeQuietly(storeProvider); Closeables.closeQuietly(coord); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/451dd608/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java index 7d48711..165d5f3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java @@ -22,7 +22,6 @@ import io.netty.channel.EventLoopGroup; import java.util.Collection; import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.exec.cache.DistributedCache; import org.apache.drill.exec.compile.CodeCompiler; import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; @@ -49,7 +48,6 @@ public class DrillbitContext { private PhysicalPlanReader reader; private final ClusterCoordinator coord; private final DataConnectionCreator connectionsPool; - private final DistributedCache cache; private final DrillbitEndpoint endpoint; private final StoragePluginRegistry storagePlugins; private final OperatorCreatorRegistry operatorCreatorRegistry; @@ -60,7 +58,7 @@ public class DrillbitContext { private final PStoreProvider provider; private final CodeCompiler compiler; - public DrillbitContext(DrillbitEndpoint endpoint, BootStrapContext context, ClusterCoordinator coord, Controller controller, DataConnectionCreator connectionsPool, DistributedCache cache, WorkEventBus workBus, PStoreProvider provider) { + public DrillbitContext(DrillbitEndpoint endpoint, BootStrapContext context, ClusterCoordinator coord, Controller controller, DataConnectionCreator connectionsPool, WorkEventBus workBus, PStoreProvider provider) { super(); Preconditions.checkNotNull(endpoint); Preconditions.checkNotNull(context); @@ -71,7 +69,6 @@ public class DrillbitContext { this.context = context; this.coord = coord; this.connectionsPool = connectionsPool; - this.cache = cache; this.endpoint = endpoint; this.provider = provider; this.storagePlugins = new StoragePluginRegistry(this); @@ -79,7 +76,7 @@ public class DrillbitContext { this.operatorCreatorRegistry = new OperatorCreatorRegistry(context.getConfig()); this.functionRegistry = new FunctionImplementationRegistry(context.getConfig()); this.systemOptions = new SystemOptionManager(context.getConfig(), provider); - this.compiler = new CodeCompiler(context.getConfig(), cache, systemOptions); + this.compiler = new CodeCompiler(context.getConfig(), systemOptions); } public FunctionImplementationRegistry getFunctionImplementationRegistry() { @@ -134,10 +131,6 @@ public class DrillbitContext { return context.getMetrics(); } - public DistributedCache getCache(){ - return cache; - } - public PhysicalPlanReader getPlanReader(){ return reader; }