http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ZookeeperCacheStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ZookeeperCacheStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ZookeeperCacheStore.java new file mode 100644 index 0000000..46d4eca --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ZookeeperCacheStore.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.cache.infinispan; + +import org.infinispan.marshall.core.MarshalledEntry; +import org.infinispan.persistence.spi.ExternalStore; +import org.infinispan.persistence.spi.InitializationContext; + +/** + * Stores the cached objects in zookeeper. Objects are stored in /start/cache_name/key_name = data + * @param <K> + * @param <V> + */ +public class ZookeeperCacheStore<K, V> implements ExternalStore<K, V>{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZookeeperCacheStore.class); + + private String cacheName; + + @Override + public void init(InitializationContext ctx) { + ctx.getConfiguration(); + + } + + @Override + public MarshalledEntry<K, V> load(K key) { + return null; + } + + @Override + public boolean contains(K key) { + return false; + } + + @Override + public void start() { + } + + @Override + public void stop() { + } + + @Override + public void write(MarshalledEntry<K, V> entry) { + } + + @Override + public boolean delete(K key) { + return false; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java new file mode 100644 index 0000000..e66cc90 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java @@ -0,0 +1,309 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.cache.local; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.reflect.InvocationTargetException; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.drill.common.util.DataInputInputStream; +import org.apache.drill.common.util.DataOutputOutputStream; +import org.apache.drill.exec.cache.Counter; +import org.apache.drill.exec.cache.DistributedCache; +import org.apache.drill.exec.cache.DistributedMap; +import org.apache.drill.exec.cache.DistributedMultiMap; +import org.apache.drill.exec.cache.DrillSerializable; +import org.apache.drill.exec.exception.DrillbitStartupException; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.TopLevelAllocator; +import org.apache.drill.exec.proto.BitControl.PlanFragment; +import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.io.ByteArrayDataInput; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; + +public class LocalCache implements DistributedCache { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalCache.class); + + private volatile Map<FragmentHandle, PlanFragment> handles; + private volatile ConcurrentMap<String, DistributedMap<?>> namedMaps; + private volatile ConcurrentMap<Class<?>, DistributedMap<?>> maps; + private volatile ConcurrentMap<Class<?>, DistributedMultiMap<?>> multiMaps; + private volatile ConcurrentMap<String, Counter> counters; + private static final BufferAllocator allocator = new TopLevelAllocator(); + + private static final ObjectMapper mapper = DrillConfig.create().getMapper(); + + @Override + public void close() throws IOException { + handles = null; + } + + @Override + public void run() throws DrillbitStartupException { + handles = Maps.newConcurrentMap(); + maps = Maps.newConcurrentMap(); + multiMaps = Maps.newConcurrentMap(); + counters = Maps.newConcurrentMap(); + namedMaps = Maps.newConcurrentMap(); + } + + @Override + public PlanFragment getFragment(FragmentHandle handle) { +// logger.debug("looking for fragment with handle: {}", handle); + return handles.get(handle); + } + + @Override + public void storeFragment(PlanFragment fragment) { +// logger.debug("Storing fragment: {}", fragment); + handles.put(fragment.getHandle(), fragment); + } + + @Override + public <V extends DrillSerializable> DistributedMultiMap<V> getMultiMap(Class<V> clazz) { + DistributedMultiMap<V> mmap = (DistributedMultiMap<V>) multiMaps.get(clazz); + if (mmap == null) { + multiMaps.putIfAbsent(clazz, new LocalDistributedMultiMapImpl<V>(clazz)); + return (DistributedMultiMap<V>) multiMaps.get(clazz); + } else { + return mmap; + } + } + + @Override + public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> clazz) { + DistributedMap m = maps.get(clazz); + if (m == null) { + maps.putIfAbsent(clazz, new LocalDistributedMapImpl<V>(clazz)); + return (DistributedMap<V>) maps.get(clazz); + } else { + return m; + } + } + + + @Override + public <V extends DrillSerializable> DistributedMap<V> getNamedMap(String name, Class<V> clazz) { + DistributedMap m = namedMaps.get(clazz); + if (m == null) { + namedMaps.putIfAbsent(name, new LocalDistributedMapImpl<V>(clazz)); + return (DistributedMap<V>) namedMaps.get(name); + } else { + return m; + } + } + + @Override + public Counter getCounter(String name) { + Counter c = counters.get(name); + if (c == null) { + counters.putIfAbsent(name, new LocalCounterImpl()); + return counters.get(name); + } else { + return c; + } + } + + public static ByteArrayDataOutput serialize(DrillSerializable obj) { + if(obj instanceof JacksonSerializable){ + try{ + ByteArrayDataOutput out = ByteStreams.newDataOutput(); + out.write(mapper.writeValueAsBytes(obj)); + return out; + }catch(Exception e){ + throw new RuntimeException(e); + } + } + + ByteArrayDataOutput out = ByteStreams.newDataOutput(); + OutputStream outputStream = DataOutputOutputStream.constructOutputStream(out); + try { + obj.writeToStream(outputStream); + } catch (IOException e) { + throw new RuntimeException(e); + } + try { + outputStream.flush(); + } catch (IOException e) { + throw new RuntimeException(e); + } + return out; + } + + public static <V extends DrillSerializable> V deserialize(byte[] bytes, Class<V> clazz) { + if(JacksonSerializable.class.isAssignableFrom(clazz)){ + try{ + return (V) mapper.readValue(bytes, clazz); + }catch(Exception e){ + throw new RuntimeException(e); + } + } + + ByteArrayDataInput in = ByteStreams.newDataInput(bytes); + InputStream inputStream = DataInputInputStream.constructInputStream(in); + try { + V obj = clazz.getConstructor(BufferAllocator.class).newInstance(allocator); + obj.readFromStream(inputStream); + return obj; + } catch (InstantiationException | IllegalAccessException | IOException | NoSuchMethodException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + public static class LocalDistributedMultiMapImpl<V extends DrillSerializable> implements DistributedMultiMap<V> { + private ArrayListMultimap<String, ByteArrayDataOutput> mmap; + private Class<V> clazz; + + public LocalDistributedMultiMapImpl(Class<V> clazz) { + mmap = ArrayListMultimap.create(); + this.clazz = clazz; + } + + @Override + public Collection<V> get(String key) { + List<V> list = Lists.newArrayList(); + for (ByteArrayDataOutput o : mmap.get(key)) { + list.add(deserialize(o.toByteArray(), this.clazz)); + } + return list; + } + + @Override + public void put(String key, DrillSerializable value) { + mmap.put(key, serialize(value)); + } + } + + public static class LocalDistributedMapImpl<V extends DrillSerializable> implements DistributedMap<V> { + protected ConcurrentMap<String, ByteArrayDataOutput> m; + protected Class<V> clazz; + + public LocalDistributedMapImpl(Class<V> clazz) { + m = Maps.newConcurrentMap(); + this.clazz = clazz; + } + + @Override + public V get(String key) { + if (m.get(key) == null) return null; + ByteArrayDataOutput b = m.get(key); + byte[] bytes = b.toByteArray(); + return (V) deserialize(m.get(key).toByteArray(), this.clazz); + } + + @Override + public void put(String key, V value) { + m.put(key, serialize(value)); + } + + @Override + public void putIfAbsent(String key, V value) { + m.putIfAbsent(key, serialize(value)); + } + + @Override + public void putIfAbsent(String key, V value, long ttl, TimeUnit timeUnit) { + m.putIfAbsent(key, serialize(value)); + logger.warn("Expiration not implemented in local map cache"); + } + + private class DeserializingTransformer implements Iterator<Map.Entry<String, V> >{ + private Iterator<Map.Entry<String, ByteArrayDataOutput>> inner; + + public DeserializingTransformer(Iterator<Entry<String, ByteArrayDataOutput>> inner) { + super(); + this.inner = inner; + } + + @Override + public boolean hasNext() { + return inner.hasNext(); + } + + @Override + public Entry<String, V> next() { + return newEntry(inner.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + public Entry<String, V> newEntry(final Entry<String, ByteArrayDataOutput> input) { + return new Map.Entry<String, V>(){ + + @Override + public String getKey() { + return input.getKey(); + } + + @Override + public V getValue() { + return deserialize(input.getValue().toByteArray(), clazz); + } + + @Override + public V setValue(V value) { + throw new UnsupportedOperationException(); + } + + }; + } + + } + @Override + public Iterator<Entry<String, V>> iterator() { + return new DeserializingTransformer(m.entrySet().iterator()); + } + } + + public static class LocalCounterImpl implements Counter { + private AtomicLong al = new AtomicLong(); + + @Override + public long get() { + return al.get(); + } + + @Override + public long incrementAndGet() { + return al.incrementAndGet(); + } + + @Override + public long decrementAndGet() { + return al.decrementAndGet(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java index f105363..a0c439e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java @@ -25,13 +25,13 @@ import java.util.concurrent.TimeUnit; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; -import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.data.Order.Ordering; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.Types; +import org.apache.drill.exec.cache.CachedVectorContainer; import org.apache.drill.exec.cache.Counter; import org.apache.drill.exec.cache.DistributedCache; import org.apache.drill.exec.cache.DistributedMap; @@ -115,9 +115,9 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart private int recordCount; private final IntVector partitionKeyVector; - private final DistributedMap<VectorAccessibleSerializable> tableMap; + private final DistributedMap<CachedVectorContainer> tableMap; private final Counter minorFragmentSampleCount; - private final DistributedMultiMap<VectorAccessibleSerializable> mmap; + private final DistributedMultiMap<CachedVectorContainer> mmap; private final String mapKey; private List<VectorContainer> sampledIncomingBatches; @@ -131,8 +131,8 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart this.completionFactor = pop.getCompletionFactor(); DistributedCache cache = context.getDrillbitContext().getCache(); - this.mmap = cache.getMultiMap(VectorAccessibleSerializable.class); - this.tableMap = cache.getMap(VectorAccessibleSerializable.class); + this.mmap = cache.getMultiMap(CachedVectorContainer.class); + this.tableMap = cache.getMap(CachedVectorContainer.class); Preconditions.checkNotNull(tableMap); this.mapKey = String.format("%s_%d", context.getHandle().getQueryId(), context.getHandle().getMajorFragmentId()); @@ -220,7 +220,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart // into a serializable wrapper object, and then add to distributed map WritableBatch batch = WritableBatch.getBatchNoHVWrap(containerToCache.getRecordCount(), containerToCache, false); - VectorAccessibleSerializable sampleToSave = new VectorAccessibleSerializable(batch, oContext.getAllocator()); + CachedVectorContainer sampleToSave = new CachedVectorContainer(batch, context.getAllocator()); mmap.put(mapKey, sampleToSave); this.sampledIncomingBatches = builder.getHeldRecordBatches(); @@ -251,7 +251,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart return false; } - VectorAccessibleSerializable finalTable = null; + CachedVectorContainer finalTable = null; long val = minorFragmentSampleCount.incrementAndGet(); logger.debug("Incremented mfsc, got {}", val); @@ -301,8 +301,8 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart // Get all samples from distributed map - SortRecordBatchBuilder containerBuilder = new SortRecordBatchBuilder(oContext.getAllocator(), MAX_SORT_BYTES); - for (VectorAccessibleSerializable w : mmap.get(mapKey)) { + SortRecordBatchBuilder containerBuilder = new SortRecordBatchBuilder(context.getAllocator(), MAX_SORT_BYTES); + for (CachedVectorContainer w : mmap.get(mapKey)) { containerBuilder.add(w.get()); } VectorContainer allSamplesContainer = new VectorContainer(); @@ -346,7 +346,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart } candidatePartitionTable.setRecordCount(copier.getOutputRecords()); WritableBatch batch = WritableBatch.getBatchNoHVWrap(candidatePartitionTable.getRecordCount(), candidatePartitionTable, false); - VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, context.getDrillbitContext().getAllocator()); + CachedVectorContainer wrap = new CachedVectorContainer(batch, context.getDrillbitContext().getAllocator()); tableMap.putIfAbsent(mapKey + "final", wrap, 1, TimeUnit.MINUTES); candidatePartitionTable.clear(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java index 7297dc3..0e3181d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java @@ -22,7 +22,7 @@ import java.io.Closeable; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.cache.DistributedCache; -import org.apache.drill.exec.cache.HazelCache; +import org.apache.drill.exec.cache.hazel.HazelCache; import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.coord.ClusterCoordinator.RegistrationHandle; import org.apache.drill.exec.coord.ZKClusterCoordinator; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java index c0b82bd..2078107 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java @@ -21,17 +21,17 @@ import java.io.Closeable; import java.io.IOException; import org.apache.drill.exec.cache.DistributedCache; -import org.apache.drill.exec.cache.LocalCache; +import org.apache.drill.exec.cache.local.LocalCache; import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.coord.LocalClusterCoordinator; import org.apache.drill.exec.exception.DrillbitStartupException; public class RemoteServiceSet implements Closeable{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteServiceSet.class); - + private final DistributedCache cache; private final ClusterCoordinator coordinator; - + public RemoteServiceSet(DistributedCache cache, ClusterCoordinator coordinator) { super(); this.cache = cache; @@ -46,16 +46,21 @@ public class RemoteServiceSet implements Closeable{ public ClusterCoordinator getCoordinator() { return coordinator; } - - + + @Override public void close() throws IOException { + try{ cache.close(); + }catch(Exception e){ + if(e instanceof IOException) throw (IOException) e; + throw new IOException("Failure while closing cache", e); + } coordinator.close(); } public static RemoteServiceSet getLocalServiceSet(){ return new RemoteServiceSet(new LocalCache(), new LocalClusterCoordinator()); } - + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java index 99e712b..20722d9 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java @@ -29,7 +29,7 @@ import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.util.TestTools; import org.apache.drill.exec.ExecTest; import org.apache.drill.exec.cache.DistributedCache; -import org.apache.drill.exec.cache.LocalCache; +import org.apache.drill.exec.cache.local.LocalCache; import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; import org.apache.drill.exec.memory.TopLevelAllocator; import org.apache.drill.exec.ops.QueryContext; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/test/java/org/apache/drill/exec/cache/ISpan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/ISpan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/ISpan.java new file mode 100644 index 0000000..13322f1 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/ISpan.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.cache; + +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.Externalizable; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.OutputStream; +import java.util.List; + +import org.infinispan.Cache; +import org.infinispan.configuration.cache.CacheMode; +import org.infinispan.configuration.cache.Configuration; +import org.infinispan.configuration.cache.ConfigurationBuilder; +import org.infinispan.configuration.global.GlobalConfiguration; +import org.infinispan.configuration.global.GlobalConfigurationBuilder; +import org.infinispan.manager.DefaultCacheManager; +import org.infinispan.manager.EmbeddedCacheManager; + +import com.google.hive12.common.collect.Lists; + +public class ISpan { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ISpan.class); + + + public static void main(String[] args) throws Exception{ + GlobalConfiguration gc = new GlobalConfigurationBuilder().transport().defaultTransport().build(); + Configuration c = new ConfigurationBuilder() // + .clustering().cacheMode(CacheMode.DIST_ASYNC) // + .storeAsBinary() + .build(); + EmbeddedCacheManager ecm = new DefaultCacheManager(gc, c); + + Cache<String, List<XT>> cache = ecm.getCache(); + List<XT> items = Lists.newArrayList(); + items.add(new XT(1)); + items.add(new XT(2)); + + cache.put("items", items); + for(XT x : cache.get("items")){ + System.out.println(x.i); + } + + + } + + private static class XT extends AbstractDataSerializable{ + + int i =0; + + + public XT(int i) { + super(); + this.i = i; + } + + @Override + public void read(DataInput input) throws IOException { + i = input.readInt(); + } + + @Override + public void write(DataOutput output) throws IOException { + output.writeInt(i); + } + + @Override + public String toString() { + return "XT [i=" + i + "]"; + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java index a3d39a3..7686614 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java @@ -17,7 +17,7 @@ */ package org.apache.drill.exec.cache; -import com.beust.jcommander.internal.Lists; +import java.util.List; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.expression.ExpressionPosition; @@ -25,67 +25,105 @@ import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.Types; import org.apache.drill.exec.ExecTest; +import org.apache.drill.exec.cache.hazel.HazelCache; +import org.apache.drill.exec.cache.infinispan.ICache; import org.apache.drill.exec.expr.TypeHelper; -import org.apache.drill.exec.record.*; +import org.apache.drill.exec.memory.TopLevelAllocator; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.VectorAccessible; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.server.Drillbit; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.RemoteServiceSet; -import org.apache.drill.exec.vector.*; +import org.apache.drill.exec.vector.AllocationHelper; +import org.apache.drill.exec.vector.IntVector; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.VarBinaryVector; import org.junit.Test; -import java.util.List; +import com.beust.jcommander.internal.Lists; -public class TestVectorCache extends ExecTest{ +public class TestVectorCache extends ExecTest{ - @Test - public void testVectorCache() throws Exception { + private void testCache(DrillConfig config, DistributedCache dcache) throws Exception { List<ValueVector> vectorList = Lists.newArrayList(); RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); - DrillConfig config = DrillConfig.create(); - Drillbit bit = new Drillbit(config, serviceSet); - bit.run(); - DrillbitContext context = bit.getContext(); - HazelCache cache = new HazelCache(config, context.getAllocator()); - cache.run(); - MaterializedField intField = MaterializedField.create(SchemaPath.getSimplePath("int"), Types.required(TypeProtos.MinorType.INT)); - IntVector intVector = (IntVector)TypeHelper.getNewVector(intField, context.getAllocator()); - MaterializedField binField = MaterializedField.create(new SchemaPath("binary", ExpressionPosition.UNKNOWN), Types.required(TypeProtos.MinorType.VARBINARY)); - VarBinaryVector binVector = (VarBinaryVector)TypeHelper.getNewVector(binField, context.getAllocator()); - AllocationHelper.allocate(intVector, 4, 4); - AllocationHelper.allocate(binVector, 4, 5); - vectorList.add(intVector); - vectorList.add(binVector); - - intVector.getMutator().setSafe(0, 0); binVector.getMutator().setSafe(0, "ZERO".getBytes()); - intVector.getMutator().setSafe(1, 1); binVector.getMutator().setSafe(1, "ONE".getBytes()); - intVector.getMutator().setSafe(2, 2); binVector.getMutator().setSafe(2, "TWO".getBytes()); - intVector.getMutator().setSafe(3, 3); binVector.getMutator().setSafe(3, "THREE".getBytes()); - intVector.getMutator().setValueCount(4); - binVector.getMutator().setValueCount(4); - - VectorContainer container = new VectorContainer(); - container.addCollection(vectorList); - container.setRecordCount(4); - WritableBatch batch = WritableBatch.getBatchNoHVWrap(container.getRecordCount(), container, false); - VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, context.getAllocator()); - - DistributedMultiMap<VectorAccessibleSerializable> mmap = cache.getMultiMap(VectorAccessibleSerializable.class); - mmap.put("vectors", wrap); - VectorAccessibleSerializable newWrap = (VectorAccessibleSerializable)mmap.get("vectors").iterator().next(); - - VectorAccessible newContainer = newWrap.get(); - for (VectorWrapper w : newContainer) { - ValueVector vv = w.getValueVector(); - int values = vv.getAccessor().getValueCount(); - for (int i = 0; i < values; i++) { - Object o = vv.getAccessor().getObject(i); - if (o instanceof byte[]) { - System.out.println(new String((byte[])o)); - } else { - System.out.println(o); + try (Drillbit bit = new Drillbit(config, serviceSet); DistributedCache cache = dcache) { + bit.run(); + cache.run(); + + DrillbitContext context = bit.getContext(); + + + MaterializedField intField = MaterializedField.create(new SchemaPath("int", ExpressionPosition.UNKNOWN), + Types.required(TypeProtos.MinorType.INT)); + IntVector intVector = (IntVector) TypeHelper.getNewVector(intField, context.getAllocator()); + MaterializedField binField = MaterializedField.create(new SchemaPath("binary", ExpressionPosition.UNKNOWN), + Types.required(TypeProtos.MinorType.VARBINARY)); + VarBinaryVector binVector = (VarBinaryVector) TypeHelper.getNewVector(binField, context.getAllocator()); + AllocationHelper.allocate(intVector, 4, 4); + AllocationHelper.allocate(binVector, 4, 5); + vectorList.add(intVector); + vectorList.add(binVector); + + intVector.getMutator().set(0, 0); + binVector.getMutator().set(0, "ZERO".getBytes()); + intVector.getMutator().set(1, 1); + binVector.getMutator().set(1, "ONE".getBytes()); + intVector.getMutator().set(2, 2); + binVector.getMutator().set(2, "TWO".getBytes()); + intVector.getMutator().set(3, 3); + binVector.getMutator().set(3, "THREE".getBytes()); + intVector.getMutator().setValueCount(4); + binVector.getMutator().setValueCount(4); + + VectorContainer container = new VectorContainer(); + container.addCollection(vectorList); + container.setRecordCount(4); + WritableBatch batch = WritableBatch.getBatchNoHVWrap(container.getRecordCount(), container, false); + CachedVectorContainer wrap = new CachedVectorContainer(batch, context.getAllocator()); + + DistributedMultiMap<CachedVectorContainer> mmap = cache.getMultiMap(CachedVectorContainer.class); + mmap.put("vectors", wrap); + + CachedVectorContainer newWrap = (CachedVectorContainer) mmap.get("vectors").iterator().next(); + + VectorAccessible newContainer = newWrap.get(); + for (VectorWrapper<?> w : newContainer) { + ValueVector vv = w.getValueVector(); + int values = vv.getAccessor().getValueCount(); + for (int i = 0; i < values; i++) { + Object o = vv.getAccessor().getObject(i); + if (o instanceof byte[]) { + System.out.println(new String((byte[]) o)); + } else { + System.out.println(o); + } } } + + newWrap.clear(); } + + } + + @Test + public void testHazelVectorCache() throws Exception { + DrillConfig c = DrillConfig.create(); + HazelCache cache = new HazelCache(c, new TopLevelAllocator()); + cache.run(); + testCache(c, cache); + cache.close(); + } + + @Test + public void testICache() throws Exception { + DrillConfig c = DrillConfig.create(); + ICache cache = new ICache(c, new TopLevelAllocator()); + testCache(c, cache); + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOrphanSchema.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOrphanSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOrphanSchema.java index 63bc0a9..a5dbfe5 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOrphanSchema.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOrphanSchema.java @@ -23,7 +23,7 @@ import net.hydromatic.optiq.tools.Frameworks; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.ExecTest; -import org.apache.drill.exec.cache.LocalCache; +import org.apache.drill.exec.cache.local.LocalCache; import org.apache.drill.exec.memory.TopLevelAllocator; import org.apache.drill.exec.rpc.user.UserSession; import org.apache.drill.exec.server.DrillbitContext; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/OrphanSchema.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/OrphanSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/OrphanSchema.java index acb5929..3ccb96b 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/OrphanSchema.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/OrphanSchema.java @@ -18,12 +18,13 @@ package org.apache.drill.exec.store.ischema; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import net.hydromatic.optiq.SchemaPlus; import net.hydromatic.optiq.tools.Frameworks; import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.exec.cache.LocalCache; +import org.apache.drill.exec.cache.local.LocalCache; import org.apache.drill.exec.memory.TopLevelAllocator; import org.apache.drill.exec.rpc.user.UserSession; import org.apache.drill.exec.server.DrillbitContext; @@ -40,7 +41,7 @@ public class OrphanSchema { * @return root node of the created schema. */ public static SchemaPlus create() throws Exception { - + final DrillConfig c = DrillConfig.create(); // Mock up a context which will allow us to create a schema. @@ -51,7 +52,7 @@ public class OrphanSchema { when(bitContext.getCache()).thenReturn(new LocalCache()); bitContext.getCache().run(); - + // Using the mock context, get the orphan schema. StoragePluginRegistry r = new StoragePluginRegistry(bitContext); r.init();
