Repository: accumulo Updated Branches: refs/heads/IGNITE d1222954a -> d29b00837
IGNITE: fixes for using BinaryObject Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d29b0083 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d29b0083 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d29b0083 Branch: refs/heads/IGNITE Commit: d29b00837d7f9e03afb9bcb146f29667eb2735be Parents: d122295 Author: Dave Marion <[email protected]> Authored: Tue May 23 15:45:37 2017 -0400 Committer: Dave Marion <[email protected]> Committed: Tue May 23 15:45:37 2017 -0400 ---------------------------------------------------------------------- .../cache/tiered/TieredBlockCache.java | 96 +++++++++++++------- .../tiered/TieredBlockCacheConfiguration.java | 1 + .../cache/tiered/TieredBlockCacheManager.java | 9 +- 3 files changed, 71 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/d29b0083/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java index 74706ca..6086137 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java @@ -19,6 +19,7 @@ package org.apache.accumulo.core.file.blockfile.cache.tiered; import static java.util.Objects.requireNonNull; +import java.lang.ref.SoftReference; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -28,6 +29,7 @@ import javax.cache.processor.MutableEntry; import org.apache.accumulo.core.file.blockfile.cache.BlockCache; import org.apache.accumulo.core.file.blockfile.cache.CacheEntry; +import org.apache.accumulo.core.file.rfile.BlockIndex; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteBinary; import org.apache.ignite.IgniteCache; @@ -36,6 +38,7 @@ import org.apache.ignite.binary.BinaryObjectBuilder; import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.cache.CacheMetrics; import org.apache.ignite.cache.CachePeekMode; +import org.apache.ignite.internal.binary.BinaryObjectImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,76 +56,103 @@ public class TieredBlockCache implements BlockCache { private final IgniteCache<String,BinaryObject> cache; - private BinaryObject binary; + private BinaryObject binaryObject; private transient byte[] buffer; - private transient Object idx; + private transient BlockIndex idx; - private LazyBlock(BinaryObject binary, IgniteCache<String,BinaryObject> cache) { - this.binary = binary; + private LazyBlock(BinaryObject object, IgniteCache<String,BinaryObject> cache) { + this.binaryObject = object; this.cache = cache; } @Override public byte[] getBuffer() { if (null == buffer) { - buffer = this.binary.field(BUFFER_FIELD); + buffer = this.binaryObject.field(BUFFER_FIELD); } return buffer; } @Override public Object getIndex() { - if (null == idx) { - idx = this.binary.field(INDEX_FIELD); + if (null == idx && this.binaryObject.hasField(INDEX_FIELD)) { + Object o = this.binaryObject.field(INDEX_FIELD); + if (o instanceof BinaryObjectImpl) { + BinaryObjectImpl b = (BinaryObjectImpl) o; + idx = b.deserialize(); + } else if (o instanceof BlockIndex) { + idx = (BlockIndex) o; + } else { + throw new RuntimeException("Unknown object type: " + o.getClass().getName()); + } } - return idx; + return new SoftReference<BlockIndex>(idx); } private void resetBinary(BinaryObject binary) { - this.binary = binary; + this.binaryObject = binary; this.buffer = null; this.idx = null; } @Override public void setIndex(final Object idx) { - String key = this.binary.field(KEY_FIELD); - this.cache.invoke(key, new CacheEntryProcessor<String,BinaryObject,Void>() { - private static final long serialVersionUID = 1L; - - @Override - public Void process(MutableEntry<String,BinaryObject> entry, Object... arguments) throws EntryProcessorException { - BinaryObjectBuilder builder = entry.getValue().toBuilder(); - builder.setField(INDEX_FIELD, idx); - BinaryObject update = builder.build(); - entry.setValue(update); - resetBinary(binary); - return null; + if (idx instanceof SoftReference<?>) { + SoftReference<?> ref = (SoftReference<?>) idx; + Object o = ref.get(); + if (o == null) { + throw new IllegalStateException("Index is not set"); + } + if (o instanceof BlockIndex) { + final BlockIndex bi = (BlockIndex) o; + final String key = this.binaryObject.field(KEY_FIELD); + this.cache.invoke(key, new CacheEntryProcessor<String,BinaryObject,Void>() { + private static final long serialVersionUID = 1L; + + @Override + public Void process(MutableEntry<String,BinaryObject> entry, Object... arguments) throws EntryProcessorException { + // final BinaryObjectBuilder builder = igniteBinary.builder(LazyBlock.class.getName()); + // final String k = entry.getValue().field(KEY_FIELD); + // final byte[] b = entry.getValue().field(BUFFER_FIELD); + // builder.setField(KEY_FIELD, k); + // builder.setField(BUFFER_FIELD, b); + final BinaryObjectBuilder builder = entry.getValue().toBuilder(); + builder.setField(INDEX_FIELD, bi, BlockIndex.class); + final BinaryObject update = builder.build(); + entry.setValue(update); + resetBinary(update); + return null; + } + }); + } else { + throw new UnsupportedOperationException("Object is not a BlockIndex, is a: " + o.getClass().getName()); } - }); + } else { + throw new UnsupportedOperationException("Unhandled object type"); + } } /** * Convert a BinaryObject to a LazyBlock. Use this method instead of BinaryObject.deserialize as it sets the reference to the IgniteCache instance to use * when setIndex is invoked * - * @param binary + * @param object * {@link BinaryObject} representing this LazyBlock * @param cache * reference to the {@link IgniteCache} whence the {@link BinaryObject} is stored * @return instance of {@link LazyBlock} */ - static LazyBlock fromIgniteBinaryObject(BinaryObject binary, IgniteCache<String,BinaryObject> cache) { - requireNonNull(binary); + static LazyBlock fromIgniteBinaryObject(BinaryObject object, IgniteCache<String,BinaryObject> cache) { + requireNonNull(object); requireNonNull(cache); - return new LazyBlock(binary, cache); + return new LazyBlock(object, cache); } /** * Convert a byte array representing a block to a binary object to put into the ignite cache * - * @param binary + * @param igniteBinary * binary interface for {@link Ignite} * @param key * they key that this block will be stored under in the cache @@ -130,11 +160,11 @@ public class TieredBlockCache implements BlockCache { * the byte array representing the block * @return {@link BinaryObject} instance */ - static BinaryObject toIgniteBinaryObject(IgniteBinary binary, String key, byte[] block) { + static BinaryObject toIgniteBinaryObject(IgniteBinary igniteBinary, String key, byte[] block) { requireNonNull(key); requireNonNull(block); - requireNonNull(binary); - BinaryObjectBuilder builder = binary.builder(LazyBlock.class.getName()); + requireNonNull(igniteBinary); + BinaryObjectBuilder builder = igniteBinary.builder(LazyBlock.class.getName()); builder.setField(KEY_FIELD, key); builder.setField(BUFFER_FIELD, block); return builder.build(); @@ -144,7 +174,7 @@ public class TieredBlockCache implements BlockCache { private static final Logger LOG = LoggerFactory.getLogger(TieredBlockCache.class); private final IgniteCache<String,BinaryObject> cache; - private final IgniteBinary binary; + private final IgniteBinary igniteBinary; private final CacheMetrics metrics; private final TieredBlockCacheConfiguration conf; private final AtomicLong hitCount = new AtomicLong(0); @@ -154,7 +184,7 @@ public class TieredBlockCache implements BlockCache { public TieredBlockCache(TieredBlockCacheConfiguration conf, Ignite ignite) { this.conf = conf; this.cache = ignite.getOrCreateCache(conf.getConfiguration()).withKeepBinary(); - this.binary = ignite.binary(); + this.igniteBinary = ignite.binary(); metrics = cache.localMxBean(); LOG.info("Created {} cache with configuration {}", conf.getConfiguration().getName(), conf.getConfiguration()); this.future = TieredBlockCacheManager.SCHEDULER.scheduleAtFixedRate(new Runnable() { @@ -191,7 +221,7 @@ public class TieredBlockCache implements BlockCache { @Override public CacheEntry cacheBlock(String blockName, byte[] buf) { - BinaryObject bo = this.cache.getAndPut(blockName, LazyBlock.toIgniteBinaryObject(binary, blockName, buf)); + BinaryObject bo = this.cache.getAndPut(blockName, LazyBlock.toIgniteBinaryObject(igniteBinary, blockName, buf)); if (null != bo) { return LazyBlock.fromIgniteBinaryObject(bo, this.cache); } else { http://git-wip-us.apache.org/repos/asf/accumulo/blob/d29b0083/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java index 382b2db..039bc85 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java @@ -58,6 +58,7 @@ public class TieredBlockCacheConfiguration extends BlockCacheConfiguration { configuration.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(new Duration(TimeUnit.valueOf(unit), time))); configuration.setStatisticsEnabled(true); configuration.setCopyOnRead(false); + configuration.setStoreKeepBinary(true); } public CacheConfiguration<String,BinaryObject> getConfiguration() { http://git-wip-us.apache.org/repos/asf/accumulo/blob/d29b0083/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java index 437c0e8..7e9b9e1 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java @@ -17,7 +17,8 @@ */ package org.apache.accumulo.core.file.blockfile.cache.tiered; -import java.util.Collections; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -30,6 +31,7 @@ import org.apache.accumulo.core.file.blockfile.cache.BlockCacheConfiguration; import org.apache.accumulo.core.file.blockfile.cache.BlockCacheManager; import org.apache.accumulo.core.file.blockfile.cache.CacheType; import org.apache.accumulo.core.file.blockfile.cache.tiered.TieredBlockCache.LazyBlock; +import org.apache.accumulo.core.file.rfile.BlockIndex; import org.apache.accumulo.core.util.NamingThreadFactory; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.ignite.Ignite; @@ -82,7 +84,10 @@ public class TieredBlockCacheManager extends BlockCacheManager { cfg.setDaemon(true); BinaryConfiguration binaryCfg = new BinaryConfiguration(); - binaryCfg.setClassNames(Collections.singleton(LazyBlock.class.getName())); + List<String> classes = new ArrayList<>(); + classes.add(LazyBlock.class.getName()); + classes.add(BlockIndex.class.getName()); + binaryCfg.setClassNames(classes); cfg.setBinaryConfiguration(binaryCfg); // Global Off-Heap Page memory configuration.
