Any motivation for using inifinispan instead of hazel cast? Tim
Sent from my iPhone > On May 21, 2014, at 6:14 PM, [email protected] wrote: > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ZookeeperCacheStore.java > ---------------------------------------------------------------------- > diff --git > a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ZookeeperCacheStore.java > > b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ZookeeperCacheStore.java > new file mode 100644 > index 0000000..46d4eca > --- /dev/null > +++ > b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ZookeeperCacheStore.java > @@ -0,0 +1,66 @@ > +/** > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > +package org.apache.drill.exec.cache.infinispan; > + > +import org.infinispan.marshall.core.MarshalledEntry; > +import org.infinispan.persistence.spi.ExternalStore; > +import org.infinispan.persistence.spi.InitializationContext; > + > +/** > + * Stores the cached objects in zookeeper. Objects are stored in > /start/cache_name/key_name = data > + * @param <K> > + * @param <V> > + */ > +public class ZookeeperCacheStore<K, V> implements ExternalStore<K, V>{ > + static final org.slf4j.Logger logger = > org.slf4j.LoggerFactory.getLogger(ZookeeperCacheStore.class); > + > + private String cacheName; > + > + @Override > + public void init(InitializationContext ctx) { > + ctx.getConfiguration(); > + > + } > + > + @Override > + public MarshalledEntry<K, V> load(K key) { > + return null; > + } > + > + @Override > + public boolean contains(K key) { > + return false; > + } > + > + @Override > + public void start() { > + } > + > + @Override > + public void stop() { > + } > + > + @Override > + public void write(MarshalledEntry<K, V> entry) { > + } > + > + @Override > + public boolean delete(K key) { > + return false; > + } > +} > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java > ---------------------------------------------------------------------- > diff --git > a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java > > b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java > new file mode 100644 > index 0000000..e66cc90 > --- /dev/null > +++ > b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java > @@ -0,0 +1,309 @@ > +/** > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > +package org.apache.drill.exec.cache.local; > + > +import java.io.IOException; > +import java.io.InputStream; > +import java.io.OutputStream; > +import java.lang.reflect.InvocationTargetException; > +import java.util.Collection; > +import java.util.Iterator; > +import java.util.List; > +import java.util.Map; > +import java.util.Map.Entry; > +import java.util.concurrent.ConcurrentMap; > +import java.util.concurrent.TimeUnit; > +import java.util.concurrent.atomic.AtomicLong; > + > +import org.apache.drill.common.util.DataInputInputStream; > +import org.apache.drill.common.util.DataOutputOutputStream; > +import org.apache.drill.exec.cache.Counter; > +import org.apache.drill.exec.cache.DistributedCache; > +import org.apache.drill.exec.cache.DistributedMap; > +import org.apache.drill.exec.cache.DistributedMultiMap; > +import org.apache.drill.exec.cache.DrillSerializable; > +import org.apache.drill.exec.exception.DrillbitStartupException; > +import org.apache.drill.exec.memory.BufferAllocator; > +import org.apache.drill.exec.memory.TopLevelAllocator; > +import org.apache.drill.exec.proto.BitControl.PlanFragment; > +import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; > + > +import com.fasterxml.jackson.databind.ObjectMapper; > +import com.google.common.collect.ArrayListMultimap; > +import com.google.common.collect.Lists; > +import com.google.common.collect.Maps; > +import com.google.common.io.ByteArrayDataInput; > +import com.google.common.io.ByteArrayDataOutput; > +import com.google.common.io.ByteStreams; > + > +public class LocalCache implements DistributedCache { > + static final org.slf4j.Logger logger = > org.slf4j.LoggerFactory.getLogger(LocalCache.class); > + > + private volatile Map<FragmentHandle, PlanFragment> handles; > + private volatile ConcurrentMap<String, DistributedMap<?>> namedMaps; > + private volatile ConcurrentMap<Class<?>, DistributedMap<?>> maps; > + private volatile ConcurrentMap<Class<?>, DistributedMultiMap<?>> multiMaps; > + private volatile ConcurrentMap<String, Counter> counters; > + private static final BufferAllocator allocator = new TopLevelAllocator(); > + > + private static final ObjectMapper mapper = > DrillConfig.create().getMapper(); > + > + @Override > + public void close() throws IOException { > + handles = null; > + } > + > + @Override > + public void run() throws DrillbitStartupException { > + handles = Maps.newConcurrentMap(); > + maps = Maps.newConcurrentMap(); > + multiMaps = Maps.newConcurrentMap(); > + counters = Maps.newConcurrentMap(); > + namedMaps = Maps.newConcurrentMap(); > + } > + > + @Override > + public PlanFragment getFragment(FragmentHandle handle) { > +// logger.debug("looking for fragment with handle: {}", handle); > + return handles.get(handle); > + } > + > + @Override > + public void storeFragment(PlanFragment fragment) { > +// logger.debug("Storing fragment: {}", fragment); > + handles.put(fragment.getHandle(), fragment); > + } > + > + @Override > + public <V extends DrillSerializable> DistributedMultiMap<V> > getMultiMap(Class<V> clazz) { > + DistributedMultiMap<V> mmap = (DistributedMultiMap<V>) > multiMaps.get(clazz); > + if (mmap == null) { > + multiMaps.putIfAbsent(clazz, new > LocalDistributedMultiMapImpl<V>(clazz)); > + return (DistributedMultiMap<V>) multiMaps.get(clazz); > + } else { > + return mmap; > + } > + } > + > + @Override > + public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> > clazz) { > + DistributedMap m = maps.get(clazz); > + if (m == null) { > + maps.putIfAbsent(clazz, new LocalDistributedMapImpl<V>(clazz)); > + return (DistributedMap<V>) maps.get(clazz); > + } else { > + return m; > + } > + } > + > + > + @Override > + public <V extends DrillSerializable> DistributedMap<V> getNamedMap(String > name, Class<V> clazz) { > + DistributedMap m = namedMaps.get(clazz); > + if (m == null) { > + namedMaps.putIfAbsent(name, new LocalDistributedMapImpl<V>(clazz)); > + return (DistributedMap<V>) namedMaps.get(name); > + } else { > + return m; > + } > + } > + > + @Override > + public Counter getCounter(String name) { > + Counter c = counters.get(name); > + if (c == null) { > + counters.putIfAbsent(name, new LocalCounterImpl()); > + return counters.get(name); > + } else { > + return c; > + } > + } > + > + public static ByteArrayDataOutput serialize(DrillSerializable obj) { > + if(obj instanceof JacksonSerializable){ > + try{ > + ByteArrayDataOutput out = ByteStreams.newDataOutput(); > + out.write(mapper.writeValueAsBytes(obj)); > + return out; > + }catch(Exception e){ > + throw new RuntimeException(e); > + } > + } > + > + ByteArrayDataOutput out = ByteStreams.newDataOutput(); > + OutputStream outputStream = > DataOutputOutputStream.constructOutputStream(out); > + try { > + obj.writeToStream(outputStream); > + } catch (IOException e) { > + throw new RuntimeException(e); > + } > + try { > + outputStream.flush(); > + } catch (IOException e) { > + throw new RuntimeException(e); > + } > + return out; > + } > + > + public static <V extends DrillSerializable> V deserialize(byte[] bytes, > Class<V> clazz) { > + if(JacksonSerializable.class.isAssignableFrom(clazz)){ > + try{ > + return (V) mapper.readValue(bytes, clazz); > + }catch(Exception e){ > + throw new RuntimeException(e); > + } > + } > + > + ByteArrayDataInput in = ByteStreams.newDataInput(bytes); > + InputStream inputStream = DataInputInputStream.constructInputStream(in); > + try { > + V obj = > clazz.getConstructor(BufferAllocator.class).newInstance(allocator); > + obj.readFromStream(inputStream); > + return obj; > + } catch (InstantiationException | IllegalAccessException | IOException | > NoSuchMethodException | InvocationTargetException e) { > + throw new RuntimeException(e); > + } > + } > + > + public static class LocalDistributedMultiMapImpl<V extends > DrillSerializable> implements DistributedMultiMap<V> { > + private ArrayListMultimap<String, ByteArrayDataOutput> mmap; > + private Class<V> clazz; > + > + public LocalDistributedMultiMapImpl(Class<V> clazz) { > + mmap = ArrayListMultimap.create(); > + this.clazz = clazz; > + } > + > + @Override > + public Collection<V> get(String key) { > + List<V> list = Lists.newArrayList(); > + for (ByteArrayDataOutput o : mmap.get(key)) { > + list.add(deserialize(o.toByteArray(), this.clazz)); > + } > + return list; > + } > + > + @Override > + public void put(String key, DrillSerializable value) { > + mmap.put(key, serialize(value)); > + } > + } > + > + public static class LocalDistributedMapImpl<V extends DrillSerializable> > implements DistributedMap<V> { > + protected ConcurrentMap<String, ByteArrayDataOutput> m; > + protected Class<V> clazz; > + > + public LocalDistributedMapImpl(Class<V> clazz) { > + m = Maps.newConcurrentMap(); > + this.clazz = clazz; > + } > + > + @Override > + public V get(String key) { > + if (m.get(key) == null) return null; > + ByteArrayDataOutput b = m.get(key); > + byte[] bytes = b.toByteArray(); > + return (V) deserialize(m.get(key).toByteArray(), this.clazz); > + } > + > + @Override > + public void put(String key, V value) { > + m.put(key, serialize(value)); > + } > + > + @Override > + public void putIfAbsent(String key, V value) { > + m.putIfAbsent(key, serialize(value)); > + } > + > + @Override > + public void putIfAbsent(String key, V value, long ttl, TimeUnit > timeUnit) { > + m.putIfAbsent(key, serialize(value)); > + logger.warn("Expiration not implemented in local map cache"); > + } > + > + private class DeserializingTransformer implements > Iterator<Map.Entry<String, V> >{ > + private Iterator<Map.Entry<String, ByteArrayDataOutput>> inner; > + > + public DeserializingTransformer(Iterator<Entry<String, > ByteArrayDataOutput>> inner) { > + super(); > + this.inner = inner; > + } > + > + @Override > + public boolean hasNext() { > + return inner.hasNext(); > + } > + > + @Override > + public Entry<String, V> next() { > + return newEntry(inner.next()); > + } > + > + @Override > + public void remove() { > + throw new UnsupportedOperationException(); > + } > + > + public Entry<String, V> newEntry(final Entry<String, > ByteArrayDataOutput> input) { > + return new Map.Entry<String, V>(){ > + > + @Override > + public String getKey() { > + return input.getKey(); > + } > + > + @Override > + public V getValue() { > + return deserialize(input.getValue().toByteArray(), clazz); > + } > + > + @Override > + public V setValue(V value) { > + throw new UnsupportedOperationException(); > + } > + > + }; > + } > + > + } > + @Override > + public Iterator<Entry<String, V>> iterator() { > + return new DeserializingTransformer(m.entrySet().iterator()); > + } > + } > + > + public static class LocalCounterImpl implements Counter { > + private AtomicLong al = new AtomicLong(); > + > + @Override > + public long get() { > + return al.get(); > + } > + > + @Override > + public long incrementAndGet() { > + return al.incrementAndGet(); > + } > + > + @Override > + public long decrementAndGet() { > + return al.decrementAndGet(); > + } > + } > +} > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java > ---------------------------------------------------------------------- > diff --git > a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java > > b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java > index f105363..a0c439e 100644 > --- > a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java > +++ > b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java > @@ -25,13 +25,13 @@ import java.util.concurrent.TimeUnit; > > import org.apache.drill.common.expression.ErrorCollector; > import org.apache.drill.common.expression.ErrorCollectorImpl; > -import org.apache.drill.common.expression.ExpressionPosition; > import org.apache.drill.common.expression.FieldReference; > import org.apache.drill.common.expression.LogicalExpression; > import org.apache.drill.common.expression.SchemaPath; > import org.apache.drill.common.logical.data.Order.Ordering; > import org.apache.drill.common.types.TypeProtos; > import org.apache.drill.common.types.Types; > +import org.apache.drill.exec.cache.CachedVectorContainer; > import org.apache.drill.exec.cache.Counter; > import org.apache.drill.exec.cache.DistributedCache; > import org.apache.drill.exec.cache.DistributedMap; > @@ -115,9 +115,9 @@ public class OrderedPartitionRecordBatch extends > AbstractRecordBatch<OrderedPart > private int recordCount; > > private final IntVector partitionKeyVector; > - private final DistributedMap<VectorAccessibleSerializable> tableMap; > + private final DistributedMap<CachedVectorContainer> tableMap; > private final Counter minorFragmentSampleCount; > - private final DistributedMultiMap<VectorAccessibleSerializable> mmap; > + private final DistributedMultiMap<CachedVectorContainer> mmap; > private final String mapKey; > private List<VectorContainer> sampledIncomingBatches; > > @@ -131,8 +131,8 @@ public class OrderedPartitionRecordBatch extends > AbstractRecordBatch<OrderedPart > this.completionFactor = pop.getCompletionFactor(); > > DistributedCache cache = context.getDrillbitContext().getCache(); > - this.mmap = cache.getMultiMap(VectorAccessibleSerializable.class); > - this.tableMap = cache.getMap(VectorAccessibleSerializable.class); > + this.mmap = cache.getMultiMap(CachedVectorContainer.class); > + this.tableMap = cache.getMap(CachedVectorContainer.class); > Preconditions.checkNotNull(tableMap); > > this.mapKey = String.format("%s_%d", context.getHandle().getQueryId(), > context.getHandle().getMajorFragmentId()); > @@ -220,7 +220,7 @@ public class OrderedPartitionRecordBatch extends > AbstractRecordBatch<OrderedPart > // into a serializable wrapper object, and then add to distributed map > > WritableBatch batch = > WritableBatch.getBatchNoHVWrap(containerToCache.getRecordCount(), > containerToCache, false); > - VectorAccessibleSerializable sampleToSave = new > VectorAccessibleSerializable(batch, oContext.getAllocator()); > + CachedVectorContainer sampleToSave = new CachedVectorContainer(batch, > context.getAllocator()); > > mmap.put(mapKey, sampleToSave); > this.sampledIncomingBatches = builder.getHeldRecordBatches(); > @@ -251,7 +251,7 @@ public class OrderedPartitionRecordBatch extends > AbstractRecordBatch<OrderedPart > return false; > } > > - VectorAccessibleSerializable finalTable = null; > + CachedVectorContainer finalTable = null; > > long val = minorFragmentSampleCount.incrementAndGet(); > logger.debug("Incremented mfsc, got {}", val); > @@ -301,8 +301,8 @@ public class OrderedPartitionRecordBatch extends > AbstractRecordBatch<OrderedPart > > // Get all samples from distributed map > > - SortRecordBatchBuilder containerBuilder = new > SortRecordBatchBuilder(oContext.getAllocator(), MAX_SORT_BYTES); > - for (VectorAccessibleSerializable w : mmap.get(mapKey)) { > + SortRecordBatchBuilder containerBuilder = new > SortRecordBatchBuilder(context.getAllocator(), MAX_SORT_BYTES); > + for (CachedVectorContainer w : mmap.get(mapKey)) { > containerBuilder.add(w.get()); > } > VectorContainer allSamplesContainer = new VectorContainer(); > @@ -346,7 +346,7 @@ public class OrderedPartitionRecordBatch extends > AbstractRecordBatch<OrderedPart > } > candidatePartitionTable.setRecordCount(copier.getOutputRecords()); > WritableBatch batch = > WritableBatch.getBatchNoHVWrap(candidatePartitionTable.getRecordCount(), > candidatePartitionTable, false); > - VectorAccessibleSerializable wrap = new > VectorAccessibleSerializable(batch, > context.getDrillbitContext().getAllocator()); > + CachedVectorContainer wrap = new CachedVectorContainer(batch, > context.getDrillbitContext().getAllocator()); > tableMap.putIfAbsent(mapKey + "final", wrap, 1, TimeUnit.MINUTES); > > candidatePartitionTable.clear(); > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java > ---------------------------------------------------------------------- > diff --git > a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java > b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java > index 7297dc3..0e3181d 100644 > --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java > +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java > @@ -22,7 +22,7 @@ import java.io.Closeable; > import org.apache.drill.common.config.DrillConfig; > import org.apache.drill.exec.ExecConstants; > import org.apache.drill.exec.cache.DistributedCache; > -import org.apache.drill.exec.cache.HazelCache; > +import org.apache.drill.exec.cache.hazel.HazelCache; > import org.apache.drill.exec.coord.ClusterCoordinator; > import org.apache.drill.exec.coord.ClusterCoordinator.RegistrationHandle; > import org.apache.drill.exec.coord.ZKClusterCoordinator; > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java > ---------------------------------------------------------------------- > diff --git > a/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java > > b/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java > index c0b82bd..2078107 100644 > --- > a/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java > +++ > b/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java > @@ -21,17 +21,17 @@ import java.io.Closeable; > import java.io.IOException; > > import org.apache.drill.exec.cache.DistributedCache; > -import org.apache.drill.exec.cache.LocalCache; > +import org.apache.drill.exec.cache.local.LocalCache; > import org.apache.drill.exec.coord.ClusterCoordinator; > import org.apache.drill.exec.coord.LocalClusterCoordinator; > import org.apache.drill.exec.exception.DrillbitStartupException; > > public class RemoteServiceSet implements Closeable{ > static final org.slf4j.Logger logger = > org.slf4j.LoggerFactory.getLogger(RemoteServiceSet.class); > - > + > private final DistributedCache cache; > private final ClusterCoordinator coordinator; > - > + > public RemoteServiceSet(DistributedCache cache, ClusterCoordinator > coordinator) { > super(); > this.cache = cache; > @@ -46,16 +46,21 @@ public class RemoteServiceSet implements Closeable{ > public ClusterCoordinator getCoordinator() { > return coordinator; > } > - > - > + > + > @Override > public void close() throws IOException { > + try{ > cache.close(); > + }catch(Exception e){ > + if(e instanceof IOException) throw (IOException) e; > + throw new IOException("Failure while closing cache", e); > + } > coordinator.close(); > } > > public static RemoteServiceSet getLocalServiceSet(){ > return new RemoteServiceSet(new LocalCache(), new > LocalClusterCoordinator()); > } > - > + > } > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java > ---------------------------------------------------------------------- > diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java > b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java > index 99e712b..20722d9 100644 > --- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java > +++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java > @@ -29,7 +29,7 @@ import org.apache.drill.common.config.DrillConfig; > import org.apache.drill.common.util.TestTools; > import org.apache.drill.exec.ExecTest; > import org.apache.drill.exec.cache.DistributedCache; > -import org.apache.drill.exec.cache.LocalCache; > +import org.apache.drill.exec.cache.local.LocalCache; > import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; > import org.apache.drill.exec.memory.TopLevelAllocator; > import org.apache.drill.exec.ops.QueryContext; > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/test/java/org/apache/drill/exec/cache/ISpan.java > ---------------------------------------------------------------------- > diff --git > a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/ISpan.java > b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/ISpan.java > new file mode 100644 > index 0000000..13322f1 > --- /dev/null > +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/ISpan.java > @@ -0,0 +1,94 @@ > +/** > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > +package org.apache.drill.exec.cache; > + > +import java.io.DataInput; > +import java.io.DataInputStream; > +import java.io.DataOutput; > +import java.io.DataOutputStream; > +import java.io.Externalizable; > +import java.io.IOException; > +import java.io.InputStream; > +import java.io.ObjectInput; > +import java.io.ObjectOutput; > +import java.io.OutputStream; > +import java.util.List; > + > +import org.infinispan.Cache; > +import org.infinispan.configuration.cache.CacheMode; > +import org.infinispan.configuration.cache.Configuration; > +import org.infinispan.configuration.cache.ConfigurationBuilder; > +import org.infinispan.configuration.global.GlobalConfiguration; > +import org.infinispan.configuration.global.GlobalConfigurationBuilder; > +import org.infinispan.manager.DefaultCacheManager; > +import org.infinispan.manager.EmbeddedCacheManager; > + > +import com.google.hive12.common.collect.Lists; > + > +public class ISpan { > + static final org.slf4j.Logger logger = > org.slf4j.LoggerFactory.getLogger(ISpan.class); > + > + > + public static void main(String[] args) throws Exception{ > + GlobalConfiguration gc = new > GlobalConfigurationBuilder().transport().defaultTransport().build(); > + Configuration c = new ConfigurationBuilder() // > + .clustering().cacheMode(CacheMode.DIST_ASYNC) // > + .storeAsBinary() > + .build(); > + EmbeddedCacheManager ecm = new DefaultCacheManager(gc, c); > + > + Cache<String, List<XT>> cache = ecm.getCache(); > + List<XT> items = Lists.newArrayList(); > + items.add(new XT(1)); > + items.add(new XT(2)); > + > + cache.put("items", items); > + for(XT x : cache.get("items")){ > + System.out.println(x.i); > + } > + > + > + } > + > + private static class XT extends AbstractDataSerializable{ > + > + int i =0; > + > + > + public XT(int i) { > + super(); > + this.i = i; > + } > + > + @Override > + public void read(DataInput input) throws IOException { > + i = input.readInt(); > + } > + > + @Override > + public void write(DataOutput output) throws IOException { > + output.writeInt(i); > + } > + > + @Override > + public String toString() { > + return "XT [i=" + i + "]"; > + } > + > + } > +} > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java > ---------------------------------------------------------------------- > diff --git > a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java > > b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java > index a3d39a3..7686614 100644 > --- > a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java > +++ > b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java > @@ -17,7 +17,7 @@ > */ > package org.apache.drill.exec.cache; > > -import com.beust.jcommander.internal.Lists; > +import java.util.List; > > import org.apache.drill.common.config.DrillConfig; > import org.apache.drill.common.expression.ExpressionPosition; > @@ -25,67 +25,105 @@ import org.apache.drill.common.expression.SchemaPath; > import org.apache.drill.common.types.TypeProtos; > import org.apache.drill.common.types.Types; > import org.apache.drill.exec.ExecTest; > +import org.apache.drill.exec.cache.hazel.HazelCache; > +import org.apache.drill.exec.cache.infinispan.ICache; > import org.apache.drill.exec.expr.TypeHelper; > -import org.apache.drill.exec.record.*; > +import org.apache.drill.exec.memory.TopLevelAllocator; > +import org.apache.drill.exec.record.MaterializedField; > +import org.apache.drill.exec.record.VectorAccessible; > +import org.apache.drill.exec.record.VectorContainer; > +import org.apache.drill.exec.record.VectorWrapper; > +import org.apache.drill.exec.record.WritableBatch; > import org.apache.drill.exec.server.Drillbit; > import org.apache.drill.exec.server.DrillbitContext; > import org.apache.drill.exec.server.RemoteServiceSet; > -import org.apache.drill.exec.vector.*; > +import org.apache.drill.exec.vector.AllocationHelper; > +import org.apache.drill.exec.vector.IntVector; > +import org.apache.drill.exec.vector.ValueVector; > +import org.apache.drill.exec.vector.VarBinaryVector; > import org.junit.Test; > > -import java.util.List; > +import com.beust.jcommander.internal.Lists; > > -public class TestVectorCache extends ExecTest{ > +public class TestVectorCache extends ExecTest{ > > - @Test > - public void testVectorCache() throws Exception { > + private void testCache(DrillConfig config, DistributedCache dcache) throws > Exception { > List<ValueVector> vectorList = Lists.newArrayList(); > RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); > - DrillConfig config = DrillConfig.create(); > - Drillbit bit = new Drillbit(config, serviceSet); > - bit.run(); > - DrillbitContext context = bit.getContext(); > - HazelCache cache = new HazelCache(config, context.getAllocator()); > - cache.run(); > > - MaterializedField intField = > MaterializedField.create(SchemaPath.getSimplePath("int"), > Types.required(TypeProtos.MinorType.INT)); > - IntVector intVector = (IntVector)TypeHelper.getNewVector(intField, > context.getAllocator()); > - MaterializedField binField = MaterializedField.create(new > SchemaPath("binary", ExpressionPosition.UNKNOWN), > Types.required(TypeProtos.MinorType.VARBINARY)); > - VarBinaryVector binVector = > (VarBinaryVector)TypeHelper.getNewVector(binField, context.getAllocator()); > - AllocationHelper.allocate(intVector, 4, 4); > - AllocationHelper.allocate(binVector, 4, 5); > - vectorList.add(intVector); > - vectorList.add(binVector); > - > - intVector.getMutator().setSafe(0, 0); binVector.getMutator().setSafe(0, > "ZERO".getBytes()); > - intVector.getMutator().setSafe(1, 1); binVector.getMutator().setSafe(1, > "ONE".getBytes()); > - intVector.getMutator().setSafe(2, 2); binVector.getMutator().setSafe(2, > "TWO".getBytes()); > - intVector.getMutator().setSafe(3, 3); binVector.getMutator().setSafe(3, > "THREE".getBytes()); > - intVector.getMutator().setValueCount(4); > - binVector.getMutator().setValueCount(4); > - > - VectorContainer container = new VectorContainer(); > - container.addCollection(vectorList); > - container.setRecordCount(4); > - WritableBatch batch = > WritableBatch.getBatchNoHVWrap(container.getRecordCount(), container, false); > - VectorAccessibleSerializable wrap = new > VectorAccessibleSerializable(batch, context.getAllocator()); > - > - DistributedMultiMap<VectorAccessibleSerializable> mmap = > cache.getMultiMap(VectorAccessibleSerializable.class); > - mmap.put("vectors", wrap); > - VectorAccessibleSerializable newWrap = > (VectorAccessibleSerializable)mmap.get("vectors").iterator().next(); > - > - VectorAccessible newContainer = newWrap.get(); > - for (VectorWrapper w : newContainer) { > - ValueVector vv = w.getValueVector(); > - int values = vv.getAccessor().getValueCount(); > - for (int i = 0; i < values; i++) { > - Object o = vv.getAccessor().getObject(i); > - if (o instanceof byte[]) { > - System.out.println(new String((byte[])o)); > - } else { > - System.out.println(o); > + try (Drillbit bit = new Drillbit(config, serviceSet); DistributedCache > cache = dcache) { > + bit.run(); > + cache.run(); > + > + DrillbitContext context = bit.getContext(); > + > + > + MaterializedField intField = MaterializedField.create(new > SchemaPath("int", ExpressionPosition.UNKNOWN), > + Types.required(TypeProtos.MinorType.INT)); > + IntVector intVector = (IntVector) TypeHelper.getNewVector(intField, > context.getAllocator()); > + MaterializedField binField = MaterializedField.create(new > SchemaPath("binary", ExpressionPosition.UNKNOWN), > + Types.required(TypeProtos.MinorType.VARBINARY)); > + VarBinaryVector binVector = (VarBinaryVector) > TypeHelper.getNewVector(binField, context.getAllocator()); > + AllocationHelper.allocate(intVector, 4, 4); > + AllocationHelper.allocate(binVector, 4, 5); > + vectorList.add(intVector); > + vectorList.add(binVector); > + > + intVector.getMutator().set(0, 0); > + binVector.getMutator().set(0, "ZERO".getBytes()); > + intVector.getMutator().set(1, 1); > + binVector.getMutator().set(1, "ONE".getBytes()); > + intVector.getMutator().set(2, 2); > + binVector.getMutator().set(2, "TWO".getBytes()); > + intVector.getMutator().set(3, 3); > + binVector.getMutator().set(3, "THREE".getBytes()); > + intVector.getMutator().setValueCount(4); > + binVector.getMutator().setValueCount(4); > + > + VectorContainer container = new VectorContainer(); > + container.addCollection(vectorList); > + container.setRecordCount(4); > + WritableBatch batch = > WritableBatch.getBatchNoHVWrap(container.getRecordCount(), container, false); > + CachedVectorContainer wrap = new CachedVectorContainer(batch, > context.getAllocator()); > + > + DistributedMultiMap<CachedVectorContainer> mmap = > cache.getMultiMap(CachedVectorContainer.class); > + mmap.put("vectors", wrap); > + > + CachedVectorContainer newWrap = (CachedVectorContainer) > mmap.get("vectors").iterator().next(); > + > + VectorAccessible newContainer = newWrap.get(); > + for (VectorWrapper<?> w : newContainer) { > + ValueVector vv = w.getValueVector(); > + int values = vv.getAccessor().getValueCount(); > + for (int i = 0; i < values; i++) { > + Object o = vv.getAccessor().getObject(i); > + if (o instanceof byte[]) { > + System.out.println(new String((byte[]) o)); > + } else { > + System.out.println(o); > + } > } > } > + > + newWrap.clear(); > } > + > + } > + > + @Test > + public void testHazelVectorCache() throws Exception { > + DrillConfig c = DrillConfig.create(); > + HazelCache cache = new HazelCache(c, new TopLevelAllocator()); > + cache.run(); > + testCache(c, cache); > + cache.close(); > + } > + > + @Test > + public void testICache() throws Exception { > + DrillConfig c = DrillConfig.create(); > + ICache cache = new ICache(c, new TopLevelAllocator()); > + testCache(c, cache); > + > } > } > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOrphanSchema.java > ---------------------------------------------------------------------- > diff --git > a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOrphanSchema.java > > b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOrphanSchema.java > index 63bc0a9..a5dbfe5 100644 > --- > a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOrphanSchema.java > +++ > b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOrphanSchema.java > @@ -23,7 +23,7 @@ import net.hydromatic.optiq.tools.Frameworks; > > import org.apache.drill.common.config.DrillConfig; > import org.apache.drill.exec.ExecTest; > -import org.apache.drill.exec.cache.LocalCache; > +import org.apache.drill.exec.cache.local.LocalCache; > import org.apache.drill.exec.memory.TopLevelAllocator; > import org.apache.drill.exec.rpc.user.UserSession; > import org.apache.drill.exec.server.DrillbitContext; > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/OrphanSchema.java > ---------------------------------------------------------------------- > diff --git > a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/OrphanSchema.java > > b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/OrphanSchema.java > index acb5929..3ccb96b 100644 > --- > a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/OrphanSchema.java > +++ > b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/OrphanSchema.java > @@ -18,12 +18,13 @@ > package org.apache.drill.exec.store.ischema; > > > -import static org.mockito.Mockito.*; > +import static org.mockito.Mockito.mock; > +import static org.mockito.Mockito.when; > import net.hydromatic.optiq.SchemaPlus; > import net.hydromatic.optiq.tools.Frameworks; > > import org.apache.drill.common.config.DrillConfig; > -import org.apache.drill.exec.cache.LocalCache; > +import org.apache.drill.exec.cache.local.LocalCache; > import org.apache.drill.exec.memory.TopLevelAllocator; > import org.apache.drill.exec.rpc.user.UserSession; > import org.apache.drill.exec.server.DrillbitContext; > @@ -40,7 +41,7 @@ public class OrphanSchema { > * @return root node of the created schema. > */ > public static SchemaPlus create() throws Exception { > - > + > final DrillConfig c = DrillConfig.create(); > > // Mock up a context which will allow us to create a schema. > @@ -51,7 +52,7 @@ public class OrphanSchema { > when(bitContext.getCache()).thenReturn(new LocalCache()); > > bitContext.getCache().run(); > - > + > // Using the mock context, get the orphan schema. > StoragePluginRegistry r = new StoragePluginRegistry(bitContext); > r.init(); >
