I always wanted to use it but initially it had an incompatible license. They switched to APL2 with v6.
Infinispan backs the JBoss distributed cache and isn't defeatured the way Hazelcast is because it isn't RedHat's only product. I'm experimenting with it for comparison purposes. The other nice about Infinispan is that is built on Jgroups, which is very solid and has real world uses in larger clusters. It also could ultimately replace our drillbit control communication backplane, making things like broadcasts much cleaner. On Wed, May 21, 2014 at 7:56 PM, Timothy Chen <[email protected]> wrote: > Any motivation for using inifinispan instead of hazel cast? > > Tim > > Sent from my iPhone > > > On May 21, 2014, at 6:14 PM, [email protected] wrote: > > > > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ZookeeperCacheStore.java > > ---------------------------------------------------------------------- > > diff --git > a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ZookeeperCacheStore.java > b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ZookeeperCacheStore.java > > new file mode 100644 > > index 0000000..46d4eca > > --- /dev/null > > +++ > b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ZookeeperCacheStore.java > > @@ -0,0 +1,66 @@ > > +/** > > + * Licensed to the Apache Software Foundation (ASF) under one > > + * or more contributor license agreements. See the NOTICE file > > + * distributed with this work for additional information > > + * regarding copyright ownership. The ASF licenses this file > > + * to you under the Apache License, Version 2.0 (the > > + * "License"); you may not use this file except in compliance > > + * with the License. You may obtain a copy of the License at > > + * > > + * http://www.apache.org/licenses/LICENSE-2.0 > > + * > > + * Unless required by applicable law or agreed to in writing, software > > + * distributed under the License is distributed on an "AS IS" BASIS, > > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > > + * See the License for the specific language governing permissions and > > + * limitations under the License. > > + */ > > +package org.apache.drill.exec.cache.infinispan; > > + > > +import org.infinispan.marshall.core.MarshalledEntry; > > +import org.infinispan.persistence.spi.ExternalStore; > > +import org.infinispan.persistence.spi.InitializationContext; > > + > > +/** > > + * Stores the cached objects in zookeeper. Objects are stored in > /start/cache_name/key_name = data > > + * @param <K> > > + * @param <V> > > + */ > > +public class ZookeeperCacheStore<K, V> implements ExternalStore<K, V>{ > > + static final org.slf4j.Logger logger = > org.slf4j.LoggerFactory.getLogger(ZookeeperCacheStore.class); > > + > > + private String cacheName; > > + > > + @Override > > + public void init(InitializationContext ctx) { > > + ctx.getConfiguration(); > > + > > + } > > + > > + @Override > > + public MarshalledEntry<K, V> load(K key) { > > + return null; > > + } > > + > > + @Override > > + public boolean contains(K key) { > > + return false; > > + } > > + > > + @Override > > + public void start() { > > + } > > + > > + @Override > > + public void stop() { > > + } > > + > > + @Override > > + public void write(MarshalledEntry<K, V> entry) { > > + } > > + > > + @Override > > + public boolean delete(K key) { > > + return false; > > + } > > +} > > > > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java > > ---------------------------------------------------------------------- > > diff --git > a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java > b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java > > new file mode 100644 > > index 0000000..e66cc90 > > --- /dev/null > > +++ > b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java > > @@ -0,0 +1,309 @@ > > +/** > > + * Licensed to the Apache Software Foundation (ASF) under one > > + * or more contributor license agreements. See the NOTICE file > > + * distributed with this work for additional information > > + * regarding copyright ownership. The ASF licenses this file > > + * to you under the Apache License, Version 2.0 (the > > + * "License"); you may not use this file except in compliance > > + * with the License. You may obtain a copy of the License at > > + * > > + * http://www.apache.org/licenses/LICENSE-2.0 > > + * > > + * Unless required by applicable law or agreed to in writing, software > > + * distributed under the License is distributed on an "AS IS" BASIS, > > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > > + * See the License for the specific language governing permissions and > > + * limitations under the License. > > + */ > > +package org.apache.drill.exec.cache.local; > > + > > +import java.io.IOException; > > +import java.io.InputStream; > > +import java.io.OutputStream; > > +import java.lang.reflect.InvocationTargetException; > > +import java.util.Collection; > > +import java.util.Iterator; > > +import java.util.List; > > +import java.util.Map; > > +import java.util.Map.Entry; > > +import java.util.concurrent.ConcurrentMap; > > +import java.util.concurrent.TimeUnit; > > +import java.util.concurrent.atomic.AtomicLong; > > + > > +import org.apache.drill.common.util.DataInputInputStream; > > +import org.apache.drill.common.util.DataOutputOutputStream; > > +import org.apache.drill.exec.cache.Counter; > > +import org.apache.drill.exec.cache.DistributedCache; > > +import org.apache.drill.exec.cache.DistributedMap; > > +import org.apache.drill.exec.cache.DistributedMultiMap; > > +import org.apache.drill.exec.cache.DrillSerializable; > > +import org.apache.drill.exec.exception.DrillbitStartupException; > > +import org.apache.drill.exec.memory.BufferAllocator; > > +import org.apache.drill.exec.memory.TopLevelAllocator; > > +import org.apache.drill.exec.proto.BitControl.PlanFragment; > > +import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; > > + > > +import com.fasterxml.jackson.databind.ObjectMapper; > > +import com.google.common.collect.ArrayListMultimap; > > +import com.google.common.collect.Lists; > > +import com.google.common.collect.Maps; > > +import com.google.common.io.ByteArrayDataInput; > > +import com.google.common.io.ByteArrayDataOutput; > > +import com.google.common.io.ByteStreams; > > + > > +public class LocalCache implements DistributedCache { > > + static final org.slf4j.Logger logger = > org.slf4j.LoggerFactory.getLogger(LocalCache.class); > > + > > + private volatile Map<FragmentHandle, PlanFragment> handles; > > + private volatile ConcurrentMap<String, DistributedMap<?>> namedMaps; > > + private volatile ConcurrentMap<Class<?>, DistributedMap<?>> maps; > > + private volatile ConcurrentMap<Class<?>, DistributedMultiMap<?>> > multiMaps; > > + private volatile ConcurrentMap<String, Counter> counters; > > + private static final BufferAllocator allocator = new > TopLevelAllocator(); > > + > > + private static final ObjectMapper mapper = > DrillConfig.create().getMapper(); > > + > > + @Override > > + public void close() throws IOException { > > + handles = null; > > + } > > + > > + @Override > > + public void run() throws DrillbitStartupException { > > + handles = Maps.newConcurrentMap(); > > + maps = Maps.newConcurrentMap(); > > + multiMaps = Maps.newConcurrentMap(); > > + counters = Maps.newConcurrentMap(); > > + namedMaps = Maps.newConcurrentMap(); > > + } > > + > > + @Override > > + public PlanFragment getFragment(FragmentHandle handle) { > > +// logger.debug("looking for fragment with handle: {}", handle); > > + return handles.get(handle); > > + } > > + > > + @Override > > + public void storeFragment(PlanFragment fragment) { > > +// logger.debug("Storing fragment: {}", fragment); > > + handles.put(fragment.getHandle(), fragment); > > + } > > + > > + @Override > > + public <V extends DrillSerializable> DistributedMultiMap<V> > getMultiMap(Class<V> clazz) { > > + DistributedMultiMap<V> mmap = (DistributedMultiMap<V>) > multiMaps.get(clazz); > > + if (mmap == null) { > > + multiMaps.putIfAbsent(clazz, new > LocalDistributedMultiMapImpl<V>(clazz)); > > + return (DistributedMultiMap<V>) multiMaps.get(clazz); > > + } else { > > + return mmap; > > + } > > + } > > + > > + @Override > > + public <V extends DrillSerializable> DistributedMap<V> > getMap(Class<V> clazz) { > > + DistributedMap m = maps.get(clazz); > > + if (m == null) { > > + maps.putIfAbsent(clazz, new LocalDistributedMapImpl<V>(clazz)); > > + return (DistributedMap<V>) maps.get(clazz); > > + } else { > > + return m; > > + } > > + } > > + > > + > > + @Override > > + public <V extends DrillSerializable> DistributedMap<V> > getNamedMap(String name, Class<V> clazz) { > > + DistributedMap m = namedMaps.get(clazz); > > + if (m == null) { > > + namedMaps.putIfAbsent(name, new > LocalDistributedMapImpl<V>(clazz)); > > + return (DistributedMap<V>) namedMaps.get(name); > > + } else { > > + return m; > > + } > > + } > > + > > + @Override > > + public Counter getCounter(String name) { > > + Counter c = counters.get(name); > > + if (c == null) { > > + counters.putIfAbsent(name, new LocalCounterImpl()); > > + return counters.get(name); > > + } else { > > + return c; > > + } > > + } > > + > > + public static ByteArrayDataOutput serialize(DrillSerializable obj) { > > + if(obj instanceof JacksonSerializable){ > > + try{ > > + ByteArrayDataOutput out = ByteStreams.newDataOutput(); > > + out.write(mapper.writeValueAsBytes(obj)); > > + return out; > > + }catch(Exception e){ > > + throw new RuntimeException(e); > > + } > > + } > > + > > + ByteArrayDataOutput out = ByteStreams.newDataOutput(); > > + OutputStream outputStream = > DataOutputOutputStream.constructOutputStream(out); > > + try { > > + obj.writeToStream(outputStream); > > + } catch (IOException e) { > > + throw new RuntimeException(e); > > + } > > + try { > > + outputStream.flush(); > > + } catch (IOException e) { > > + throw new RuntimeException(e); > > + } > > + return out; > > + } > > + > > + public static <V extends DrillSerializable> V deserialize(byte[] > bytes, Class<V> clazz) { > > + if(JacksonSerializable.class.isAssignableFrom(clazz)){ > > + try{ > > + return (V) mapper.readValue(bytes, clazz); > > + }catch(Exception e){ > > + throw new RuntimeException(e); > > + } > > + } > > + > > + ByteArrayDataInput in = ByteStreams.newDataInput(bytes); > > + InputStream inputStream = > DataInputInputStream.constructInputStream(in); > > + try { > > + V obj = > clazz.getConstructor(BufferAllocator.class).newInstance(allocator); > > + obj.readFromStream(inputStream); > > + return obj; > > + } catch (InstantiationException | IllegalAccessException | > IOException | NoSuchMethodException | InvocationTargetException e) { > > + throw new RuntimeException(e); > > + } > > + } > > + > > + public static class LocalDistributedMultiMapImpl<V extends > DrillSerializable> implements DistributedMultiMap<V> { > > + private ArrayListMultimap<String, ByteArrayDataOutput> mmap; > > + private Class<V> clazz; > > + > > + public LocalDistributedMultiMapImpl(Class<V> clazz) { > > + mmap = ArrayListMultimap.create(); > > + this.clazz = clazz; > > + } > > + > > + @Override > > + public Collection<V> get(String key) { > > + List<V> list = Lists.newArrayList(); > > + for (ByteArrayDataOutput o : mmap.get(key)) { > > + list.add(deserialize(o.toByteArray(), this.clazz)); > > + } > > + return list; > > + } > > + > > + @Override > > + public void put(String key, DrillSerializable value) { > > + mmap.put(key, serialize(value)); > > + } > > + } > > + > > + public static class LocalDistributedMapImpl<V extends > DrillSerializable> implements DistributedMap<V> { > > + protected ConcurrentMap<String, ByteArrayDataOutput> m; > > + protected Class<V> clazz; > > + > > + public LocalDistributedMapImpl(Class<V> clazz) { > > + m = Maps.newConcurrentMap(); > > + this.clazz = clazz; > > + } > > + > > + @Override > > + public V get(String key) { > > + if (m.get(key) == null) return null; > > + ByteArrayDataOutput b = m.get(key); > > + byte[] bytes = b.toByteArray(); > > + return (V) deserialize(m.get(key).toByteArray(), this.clazz); > > + } > > + > > + @Override > > + public void put(String key, V value) { > > + m.put(key, serialize(value)); > > + } > > + > > + @Override > > + public void putIfAbsent(String key, V value) { > > + m.putIfAbsent(key, serialize(value)); > > + } > > + > > + @Override > > + public void putIfAbsent(String key, V value, long ttl, TimeUnit > timeUnit) { > > + m.putIfAbsent(key, serialize(value)); > > + logger.warn("Expiration not implemented in local map cache"); > > + } > > + > > + private class DeserializingTransformer implements > Iterator<Map.Entry<String, V> >{ > > + private Iterator<Map.Entry<String, ByteArrayDataOutput>> inner; > > + > > + public DeserializingTransformer(Iterator<Entry<String, > ByteArrayDataOutput>> inner) { > > + super(); > > + this.inner = inner; > > + } > > + > > + @Override > > + public boolean hasNext() { > > + return inner.hasNext(); > > + } > > + > > + @Override > > + public Entry<String, V> next() { > > + return newEntry(inner.next()); > > + } > > + > > + @Override > > + public void remove() { > > + throw new UnsupportedOperationException(); > > + } > > + > > + public Entry<String, V> newEntry(final Entry<String, > ByteArrayDataOutput> input) { > > + return new Map.Entry<String, V>(){ > > + > > + @Override > > + public String getKey() { > > + return input.getKey(); > > + } > > + > > + @Override > > + public V getValue() { > > + return deserialize(input.getValue().toByteArray(), clazz); > > + } > > + > > + @Override > > + public V setValue(V value) { > > + throw new UnsupportedOperationException(); > > + } > > + > > + }; > > + } > > + > > + } > > + @Override > > + public Iterator<Entry<String, V>> iterator() { > > + return new DeserializingTransformer(m.entrySet().iterator()); > > + } > > + } > > + > > + public static class LocalCounterImpl implements Counter { > > + private AtomicLong al = new AtomicLong(); > > + > > + @Override > > + public long get() { > > + return al.get(); > > + } > > + > > + @Override > > + public long incrementAndGet() { > > + return al.incrementAndGet(); > > + } > > + > > + @Override > > + public long decrementAndGet() { > > + return al.decrementAndGet(); > > + } > > + } > > +} > > > > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java > > ---------------------------------------------------------------------- > > diff --git > a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java > b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java > > index f105363..a0c439e 100644 > > --- > a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java > > +++ > b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java > > @@ -25,13 +25,13 @@ import java.util.concurrent.TimeUnit; > > > > import org.apache.drill.common.expression.ErrorCollector; > > import org.apache.drill.common.expression.ErrorCollectorImpl; > > -import org.apache.drill.common.expression.ExpressionPosition; > > import org.apache.drill.common.expression.FieldReference; > > import org.apache.drill.common.expression.LogicalExpression; > > import org.apache.drill.common.expression.SchemaPath; > > import org.apache.drill.common.logical.data.Order.Ordering; > > import org.apache.drill.common.types.TypeProtos; > > import org.apache.drill.common.types.Types; > > +import org.apache.drill.exec.cache.CachedVectorContainer; > > import org.apache.drill.exec.cache.Counter; > > import org.apache.drill.exec.cache.DistributedCache; > > import org.apache.drill.exec.cache.DistributedMap; > > @@ -115,9 +115,9 @@ public class OrderedPartitionRecordBatch extends > AbstractRecordBatch<OrderedPart > > private int recordCount; > > > > private final IntVector partitionKeyVector; > > - private final DistributedMap<VectorAccessibleSerializable> tableMap; > > + private final DistributedMap<CachedVectorContainer> tableMap; > > private final Counter minorFragmentSampleCount; > > - private final DistributedMultiMap<VectorAccessibleSerializable> mmap; > > + private final DistributedMultiMap<CachedVectorContainer> mmap; > > private final String mapKey; > > private List<VectorContainer> sampledIncomingBatches; > > > > @@ -131,8 +131,8 @@ public class OrderedPartitionRecordBatch extends > AbstractRecordBatch<OrderedPart > > this.completionFactor = pop.getCompletionFactor(); > > > > DistributedCache cache = context.getDrillbitContext().getCache(); > > - this.mmap = cache.getMultiMap(VectorAccessibleSerializable.class); > > - this.tableMap = cache.getMap(VectorAccessibleSerializable.class); > > + this.mmap = cache.getMultiMap(CachedVectorContainer.class); > > + this.tableMap = cache.getMap(CachedVectorContainer.class); > > Preconditions.checkNotNull(tableMap); > > > > this.mapKey = String.format("%s_%d", > context.getHandle().getQueryId(), context.getHandle().getMajorFragmentId()); > > @@ -220,7 +220,7 @@ public class OrderedPartitionRecordBatch extends > AbstractRecordBatch<OrderedPart > > // into a serializable wrapper object, and then add to distributed > map > > > > WritableBatch batch = > WritableBatch.getBatchNoHVWrap(containerToCache.getRecordCount(), > containerToCache, false); > > - VectorAccessibleSerializable sampleToSave = new > VectorAccessibleSerializable(batch, oContext.getAllocator()); > > + CachedVectorContainer sampleToSave = new > CachedVectorContainer(batch, context.getAllocator()); > > > > mmap.put(mapKey, sampleToSave); > > this.sampledIncomingBatches = builder.getHeldRecordBatches(); > > @@ -251,7 +251,7 @@ public class OrderedPartitionRecordBatch extends > AbstractRecordBatch<OrderedPart > > return false; > > } > > > > - VectorAccessibleSerializable finalTable = null; > > + CachedVectorContainer finalTable = null; > > > > long val = minorFragmentSampleCount.incrementAndGet(); > > logger.debug("Incremented mfsc, got {}", val); > > @@ -301,8 +301,8 @@ public class OrderedPartitionRecordBatch extends > AbstractRecordBatch<OrderedPart > > > > // Get all samples from distributed map > > > > - SortRecordBatchBuilder containerBuilder = new > SortRecordBatchBuilder(oContext.getAllocator(), MAX_SORT_BYTES); > > - for (VectorAccessibleSerializable w : mmap.get(mapKey)) { > > + SortRecordBatchBuilder containerBuilder = new > SortRecordBatchBuilder(context.getAllocator(), MAX_SORT_BYTES); > > + for (CachedVectorContainer w : mmap.get(mapKey)) { > > containerBuilder.add(w.get()); > > } > > VectorContainer allSamplesContainer = new VectorContainer(); > > @@ -346,7 +346,7 @@ public class OrderedPartitionRecordBatch extends > AbstractRecordBatch<OrderedPart > > } > > candidatePartitionTable.setRecordCount(copier.getOutputRecords()); > > WritableBatch batch = > WritableBatch.getBatchNoHVWrap(candidatePartitionTable.getRecordCount(), > candidatePartitionTable, false); > > - VectorAccessibleSerializable wrap = new > VectorAccessibleSerializable(batch, > context.getDrillbitContext().getAllocator()); > > + CachedVectorContainer wrap = new CachedVectorContainer(batch, > context.getDrillbitContext().getAllocator()); > > tableMap.putIfAbsent(mapKey + "final", wrap, 1, TimeUnit.MINUTES); > > > > candidatePartitionTable.clear(); > > > > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java > > ---------------------------------------------------------------------- > > diff --git > a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java > b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java > > index 7297dc3..0e3181d 100644 > > --- > a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java > > +++ > b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java > > @@ -22,7 +22,7 @@ import java.io.Closeable; > > import org.apache.drill.common.config.DrillConfig; > > import org.apache.drill.exec.ExecConstants; > > import org.apache.drill.exec.cache.DistributedCache; > > -import org.apache.drill.exec.cache.HazelCache; > > +import org.apache.drill.exec.cache.hazel.HazelCache; > > import org.apache.drill.exec.coord.ClusterCoordinator; > > import org.apache.drill.exec.coord.ClusterCoordinator.RegistrationHandle; > > import org.apache.drill.exec.coord.ZKClusterCoordinator; > > > > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java > > ---------------------------------------------------------------------- > > diff --git > a/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java > b/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java > > index c0b82bd..2078107 100644 > > --- > a/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java > > +++ > b/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java > > @@ -21,17 +21,17 @@ import java.io.Closeable; > > import java.io.IOException; > > > > import org.apache.drill.exec.cache.DistributedCache; > > -import org.apache.drill.exec.cache.LocalCache; > > +import org.apache.drill.exec.cache.local.LocalCache; > > import org.apache.drill.exec.coord.ClusterCoordinator; > > import org.apache.drill.exec.coord.LocalClusterCoordinator; > > import org.apache.drill.exec.exception.DrillbitStartupException; > > > > public class RemoteServiceSet implements Closeable{ > > static final org.slf4j.Logger logger = > org.slf4j.LoggerFactory.getLogger(RemoteServiceSet.class); > > - > > + > > private final DistributedCache cache; > > private final ClusterCoordinator coordinator; > > - > > + > > public RemoteServiceSet(DistributedCache cache, ClusterCoordinator > coordinator) { > > super(); > > this.cache = cache; > > @@ -46,16 +46,21 @@ public class RemoteServiceSet implements Closeable{ > > public ClusterCoordinator getCoordinator() { > > return coordinator; > > } > > - > > - > > + > > + > > @Override > > public void close() throws IOException { > > + try{ > > cache.close(); > > + }catch(Exception e){ > > + if(e instanceof IOException) throw (IOException) e; > > + throw new IOException("Failure while closing cache", e); > > + } > > coordinator.close(); > > } > > > > public static RemoteServiceSet getLocalServiceSet(){ > > return new RemoteServiceSet(new LocalCache(), new > LocalClusterCoordinator()); > > } > > - > > + > > } > > > > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java > > ---------------------------------------------------------------------- > > diff --git > a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java > b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java > > index 99e712b..20722d9 100644 > > --- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java > > +++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java > > @@ -29,7 +29,7 @@ import org.apache.drill.common.config.DrillConfig; > > import org.apache.drill.common.util.TestTools; > > import org.apache.drill.exec.ExecTest; > > import org.apache.drill.exec.cache.DistributedCache; > > -import org.apache.drill.exec.cache.LocalCache; > > +import org.apache.drill.exec.cache.local.LocalCache; > > import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; > > import org.apache.drill.exec.memory.TopLevelAllocator; > > import org.apache.drill.exec.ops.QueryContext; > > > > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/test/java/org/apache/drill/exec/cache/ISpan.java > > ---------------------------------------------------------------------- > > diff --git > a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/ISpan.java > b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/ISpan.java > > new file mode 100644 > > index 0000000..13322f1 > > --- /dev/null > > +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/ISpan.java > > @@ -0,0 +1,94 @@ > > +/** > > + * Licensed to the Apache Software Foundation (ASF) under one > > + * or more contributor license agreements. See the NOTICE file > > + * distributed with this work for additional information > > + * regarding copyright ownership. The ASF licenses this file > > + * to you under the Apache License, Version 2.0 (the > > + * "License"); you may not use this file except in compliance > > + * with the License. You may obtain a copy of the License at > > + * > > + * http://www.apache.org/licenses/LICENSE-2.0 > > + * > > + * Unless required by applicable law or agreed to in writing, software > > + * distributed under the License is distributed on an "AS IS" BASIS, > > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > > + * See the License for the specific language governing permissions and > > + * limitations under the License. > > + */ > > +package org.apache.drill.exec.cache; > > + > > +import java.io.DataInput; > > +import java.io.DataInputStream; > > +import java.io.DataOutput; > > +import java.io.DataOutputStream; > > +import java.io.Externalizable; > > +import java.io.IOException; > > +import java.io.InputStream; > > +import java.io.ObjectInput; > > +import java.io.ObjectOutput; > > +import java.io.OutputStream; > > +import java.util.List; > > + > > +import org.infinispan.Cache; > > +import org.infinispan.configuration.cache.CacheMode; > > +import org.infinispan.configuration.cache.Configuration; > > +import org.infinispan.configuration.cache.ConfigurationBuilder; > > +import org.infinispan.configuration.global.GlobalConfiguration; > > +import org.infinispan.configuration.global.GlobalConfigurationBuilder; > > +import org.infinispan.manager.DefaultCacheManager; > > +import org.infinispan.manager.EmbeddedCacheManager; > > + > > +import com.google.hive12.common.collect.Lists; > > + > > +public class ISpan { > > + static final org.slf4j.Logger logger = > org.slf4j.LoggerFactory.getLogger(ISpan.class); > > + > > + > > + public static void main(String[] args) throws Exception{ > > + GlobalConfiguration gc = new > GlobalConfigurationBuilder().transport().defaultTransport().build(); > > + Configuration c = new ConfigurationBuilder() // > > + .clustering().cacheMode(CacheMode.DIST_ASYNC) // > > + .storeAsBinary() > > + .build(); > > + EmbeddedCacheManager ecm = new DefaultCacheManager(gc, c); > > + > > + Cache<String, List<XT>> cache = ecm.getCache(); > > + List<XT> items = Lists.newArrayList(); > > + items.add(new XT(1)); > > + items.add(new XT(2)); > > + > > + cache.put("items", items); > > + for(XT x : cache.get("items")){ > > + System.out.println(x.i); > > + } > > + > > + > > + } > > + > > + private static class XT extends AbstractDataSerializable{ > > + > > + int i =0; > > + > > + > > + public XT(int i) { > > + super(); > > + this.i = i; > > + } > > + > > + @Override > > + public void read(DataInput input) throws IOException { > > + i = input.readInt(); > > + } > > + > > + @Override > > + public void write(DataOutput output) throws IOException { > > + output.writeInt(i); > > + } > > + > > + @Override > > + public String toString() { > > + return "XT [i=" + i + "]"; > > + } > > + > > + } > > +} > > > > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java > > ---------------------------------------------------------------------- > > diff --git > a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java > b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java > > index a3d39a3..7686614 100644 > > --- > a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java > > +++ > b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java > > @@ -17,7 +17,7 @@ > > */ > > package org.apache.drill.exec.cache; > > > > -import com.beust.jcommander.internal.Lists; > > +import java.util.List; > > > > import org.apache.drill.common.config.DrillConfig; > > import org.apache.drill.common.expression.ExpressionPosition; > > @@ -25,67 +25,105 @@ import > org.apache.drill.common.expression.SchemaPath; > > import org.apache.drill.common.types.TypeProtos; > > import org.apache.drill.common.types.Types; > > import org.apache.drill.exec.ExecTest; > > +import org.apache.drill.exec.cache.hazel.HazelCache; > > +import org.apache.drill.exec.cache.infinispan.ICache; > > import org.apache.drill.exec.expr.TypeHelper; > > -import org.apache.drill.exec.record.*; > > +import org.apache.drill.exec.memory.TopLevelAllocator; > > +import org.apache.drill.exec.record.MaterializedField; > > +import org.apache.drill.exec.record.VectorAccessible; > > +import org.apache.drill.exec.record.VectorContainer; > > +import org.apache.drill.exec.record.VectorWrapper; > > +import org.apache.drill.exec.record.WritableBatch; > > import org.apache.drill.exec.server.Drillbit; > > import org.apache.drill.exec.server.DrillbitContext; > > import org.apache.drill.exec.server.RemoteServiceSet; > > -import org.apache.drill.exec.vector.*; > > +import org.apache.drill.exec.vector.AllocationHelper; > > +import org.apache.drill.exec.vector.IntVector; > > +import org.apache.drill.exec.vector.ValueVector; > > +import org.apache.drill.exec.vector.VarBinaryVector; > > import org.junit.Test; > > > > -import java.util.List; > > +import com.beust.jcommander.internal.Lists; > > > > -public class TestVectorCache extends ExecTest{ > > +public class TestVectorCache extends ExecTest{ > > > > - @Test > > - public void testVectorCache() throws Exception { > > + private void testCache(DrillConfig config, DistributedCache dcache) > throws Exception { > > List<ValueVector> vectorList = Lists.newArrayList(); > > RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); > > - DrillConfig config = DrillConfig.create(); > > - Drillbit bit = new Drillbit(config, serviceSet); > > - bit.run(); > > - DrillbitContext context = bit.getContext(); > > - HazelCache cache = new HazelCache(config, context.getAllocator()); > > - cache.run(); > > > > - MaterializedField intField = > MaterializedField.create(SchemaPath.getSimplePath("int"), Types.required( > TypeProtos.MinorType.INT)); > > - IntVector intVector = (IntVector)TypeHelper.getNewVector(intField, > context.getAllocator()); > > - MaterializedField binField = MaterializedField.create(new > SchemaPath("binary", ExpressionPosition.UNKNOWN), > Types.required(TypeProtos.MinorType.VARBINARY)); > > - VarBinaryVector binVector = > (VarBinaryVector)TypeHelper.getNewVector(binField, context.getAllocator()); > > - AllocationHelper.allocate(intVector, 4, 4); > > - AllocationHelper.allocate(binVector, 4, 5); > > - vectorList.add(intVector); > > - vectorList.add(binVector); > > - > > - intVector.getMutator().setSafe(0, 0); > binVector.getMutator().setSafe(0, "ZERO".getBytes()); > > - intVector.getMutator().setSafe(1, 1); > binVector.getMutator().setSafe(1, "ONE".getBytes()); > > - intVector.getMutator().setSafe(2, 2); > binVector.getMutator().setSafe(2, "TWO".getBytes()); > > - intVector.getMutator().setSafe(3, 3); > binVector.getMutator().setSafe(3, "THREE".getBytes()); > > - intVector.getMutator().setValueCount(4); > > - binVector.getMutator().setValueCount(4); > > - > > - VectorContainer container = new VectorContainer(); > > - container.addCollection(vectorList); > > - container.setRecordCount(4); > > - WritableBatch batch = > WritableBatch.getBatchNoHVWrap(container.getRecordCount(), container, > false); > > - VectorAccessibleSerializable wrap = new > VectorAccessibleSerializable(batch, context.getAllocator()); > > - > > - DistributedMultiMap<VectorAccessibleSerializable> mmap = > cache.getMultiMap(VectorAccessibleSerializable.class); > > - mmap.put("vectors", wrap); > > - VectorAccessibleSerializable newWrap = > (VectorAccessibleSerializable)mmap.get("vectors").iterator().next(); > > - > > - VectorAccessible newContainer = newWrap.get(); > > - for (VectorWrapper w : newContainer) { > > - ValueVector vv = w.getValueVector(); > > - int values = vv.getAccessor().getValueCount(); > > - for (int i = 0; i < values; i++) { > > - Object o = vv.getAccessor().getObject(i); > > - if (o instanceof byte[]) { > > - System.out.println(new String((byte[])o)); > > - } else { > > - System.out.println(o); > > + try (Drillbit bit = new Drillbit(config, serviceSet); > DistributedCache cache = dcache) { > > + bit.run(); > > + cache.run(); > > + > > + DrillbitContext context = bit.getContext(); > > + > > + > > + MaterializedField intField = MaterializedField.create(new > SchemaPath("int", ExpressionPosition.UNKNOWN), > > + Types.required(TypeProtos.MinorType.INT)); > > + IntVector intVector = (IntVector) > TypeHelper.getNewVector(intField, context.getAllocator()); > > + MaterializedField binField = MaterializedField.create(new > SchemaPath("binary", ExpressionPosition.UNKNOWN), > > + Types.required(TypeProtos.MinorType.VARBINARY)); > > + VarBinaryVector binVector = (VarBinaryVector) > TypeHelper.getNewVector(binField, context.getAllocator()); > > + AllocationHelper.allocate(intVector, 4, 4); > > + AllocationHelper.allocate(binVector, 4, 5); > > + vectorList.add(intVector); > > + vectorList.add(binVector); > > + > > + intVector.getMutator().set(0, 0); > > + binVector.getMutator().set(0, "ZERO".getBytes()); > > + intVector.getMutator().set(1, 1); > > + binVector.getMutator().set(1, "ONE".getBytes()); > > + intVector.getMutator().set(2, 2); > > + binVector.getMutator().set(2, "TWO".getBytes()); > > + intVector.getMutator().set(3, 3); > > + binVector.getMutator().set(3, "THREE".getBytes()); > > + intVector.getMutator().setValueCount(4); > > + binVector.getMutator().setValueCount(4); > > + > > + VectorContainer container = new VectorContainer(); > > + container.addCollection(vectorList); > > + container.setRecordCount(4); > > + WritableBatch batch = > WritableBatch.getBatchNoHVWrap(container.getRecordCount(), container, > false); > > + CachedVectorContainer wrap = new CachedVectorContainer(batch, > context.getAllocator()); > > + > > + DistributedMultiMap<CachedVectorContainer> mmap = > cache.getMultiMap(CachedVectorContainer.class); > > + mmap.put("vectors", wrap); > > + > > + CachedVectorContainer newWrap = (CachedVectorContainer) > mmap.get("vectors").iterator().next(); > > + > > + VectorAccessible newContainer = newWrap.get(); > > + for (VectorWrapper<?> w : newContainer) { > > + ValueVector vv = w.getValueVector(); > > + int values = vv.getAccessor().getValueCount(); > > + for (int i = 0; i < values; i++) { > > + Object o = vv.getAccessor().getObject(i); > > + if (o instanceof byte[]) { > > + System.out.println(new String((byte[]) o)); > > + } else { > > + System.out.println(o); > > + } > > } > > } > > + > > + newWrap.clear(); > > } > > + > > + } > > + > > + @Test > > + public void testHazelVectorCache() throws Exception { > > + DrillConfig c = DrillConfig.create(); > > + HazelCache cache = new HazelCache(c, new TopLevelAllocator()); > > + cache.run(); > > + testCache(c, cache); > > + cache.close(); > > + } > > + > > + @Test > > + public void testICache() throws Exception { > > + DrillConfig c = DrillConfig.create(); > > + ICache cache = new ICache(c, new TopLevelAllocator()); > > + testCache(c, cache); > > + > > } > > } > > > > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOrphanSchema.java > > ---------------------------------------------------------------------- > > diff --git > a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOrphanSchema.java > b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOrphanSchema.java > > index 63bc0a9..a5dbfe5 100644 > > --- > a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOrphanSchema.java > > +++ > b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOrphanSchema.java > > @@ -23,7 +23,7 @@ import net.hydromatic.optiq.tools.Frameworks; > > > > import org.apache.drill.common.config.DrillConfig; > > import org.apache.drill.exec.ExecTest; > > -import org.apache.drill.exec.cache.LocalCache; > > +import org.apache.drill.exec.cache.local.LocalCache; > > import org.apache.drill.exec.memory.TopLevelAllocator; > > import org.apache.drill.exec.rpc.user.UserSession; > > import org.apache.drill.exec.server.DrillbitContext; > > > > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/OrphanSchema.java > > ---------------------------------------------------------------------- > > diff --git > a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/OrphanSchema.java > b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/OrphanSchema.java > > index acb5929..3ccb96b 100644 > > --- > a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/OrphanSchema.java > > +++ > b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/OrphanSchema.java > > @@ -18,12 +18,13 @@ > > package org.apache.drill.exec.store.ischema; > > > > > > -import static org.mockito.Mockito.*; > > +import static org.mockito.Mockito.mock; > > +import static org.mockito.Mockito.when; > > import net.hydromatic.optiq.SchemaPlus; > > import net.hydromatic.optiq.tools.Frameworks; > > > > import org.apache.drill.common.config.DrillConfig; > > -import org.apache.drill.exec.cache.LocalCache; > > +import org.apache.drill.exec.cache.local.LocalCache; > > import org.apache.drill.exec.memory.TopLevelAllocator; > > import org.apache.drill.exec.rpc.user.UserSession; > > import org.apache.drill.exec.server.DrillbitContext; > > @@ -40,7 +41,7 @@ public class OrphanSchema { > > * @return root node of the created schema. > > */ > > public static SchemaPlus create() throws Exception { > > - > > + > > final DrillConfig c = DrillConfig.create(); > > > > // Mock up a context which will allow us to create a schema. > > @@ -51,7 +52,7 @@ public class OrphanSchema { > > when(bitContext.getCache()).thenReturn(new LocalCache()); > > > > bitContext.getCache().run(); > > - > > + > > // Using the mock context, get the orphan schema. > > StoragePluginRegistry r = new StoragePluginRegistry(bitContext); > > r.init(); > > >
