Repository: ignite Updated Branches: refs/heads/ignite-4465 c111648b6 -> 7e377dce9
IGNITE-4408: Allow BinaryObjects pass to IndexingSpi. This closes #1353. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/708cc8c6 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/708cc8c6 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/708cc8c6 Branch: refs/heads/ignite-4465 Commit: 708cc8c6849b21063a555895671f6f820d92184a Parents: c103ac3 Author: Andrey V. Mashenkov <[email protected]> Authored: Thu Dec 22 12:48:58 2016 +0300 Committer: Andrey V. Mashenkov <[email protected]> Committed: Thu Dec 22 12:48:58 2016 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 8 + .../processors/cache/IgniteCacheProxy.java | 2 +- .../cache/query/GridCacheQueryAdapter.java | 2 +- .../processors/query/GridQueryProcessor.java | 36 +++- .../apache/ignite/spi/indexing/IndexingSpi.java | 3 + .../cache/query/IndexingSpiQuerySelfTest.java | 199 ++++++++++++++++++- 6 files changed, 229 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/708cc8c6/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index de6cbed..fe78d88 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -500,6 +500,14 @@ public final class IgniteSystemProperties { public static final String IGNITE_UNALIGNED_MEMORY_ACCESS = "IGNITE_UNALIGNED_MEMORY_ACCESS"; /** + * When set to {@code true} BinaryObject will be unwrapped before passing to IndexingSpi to preserve + * old behavior query processor with IndexingSpi. + * <p> + * @deprecated Should be removed in Apache Ignite 2.0. + */ + public static final String IGNITE_UNWRAP_BINARY_FOR_INDEXING_SPI = "IGNITE_UNWRAP_BINARY_FOR_INDEXING_SPI"; + + /** * Enforces singleton. */ private IgniteSystemProperties() { http://git-wip-us.apache.org/repos/asf/ignite/blob/708cc8c6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index f87fa1d..b9737c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -848,7 +848,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V */ private void validate(Query qry) { if (!GridQueryProcessor.isEnabled(ctx.config()) && !(qry instanceof ScanQuery) && - !(qry instanceof ContinuousQuery)) + !(qry instanceof ContinuousQuery) && !(qry instanceof SpiQuery)) throw new CacheException("Indexing is disabled for cache: " + ctx.cache().name() + ". Use setIndexedTypes or setTypeMetadata methods on CacheConfiguration to enable."); http://git-wip-us.apache.org/repos/asf/ignite/blob/708cc8c6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java index 2355591..b29e5e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java @@ -430,7 +430,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { * @throws IgniteCheckedException If query is invalid. */ public void validate() throws IgniteCheckedException { - if ((type != SCAN && type != SET) && !GridQueryProcessor.isEnabled(cctx.config())) + if ((type != SCAN && type != SET && type != SPI) && !GridQueryProcessor.isEnabled(cctx.config())) throw new IgniteCheckedException("Indexing is disabled for cache: " + cctx.cache().name()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/708cc8c6/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 8befa0e..6c093ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -44,7 +44,7 @@ import javax.cache.Cache; import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.binary.BinaryField; import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.binary.BinaryType; @@ -160,6 +160,9 @@ public class GridQueryProcessor extends GridProcessorAdapter { /** */ private static final ThreadLocal<AffinityTopologyVersion> requestTopVer = new ThreadLocal<>(); + /** Default is @{true} */ + private final boolean isIndexingSpiAllowsBinary = !IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_UNWRAP_BINARY_FOR_INDEXING_SPI); + /** * @param ctx Kernal context. */ @@ -680,7 +683,11 @@ public class GridQueryProcessor extends GridProcessorAdapter { if (ctx.indexing().enabled()) { coctx = cacheObjectContext(space); - ctx.indexing().store(space, key.value(coctx, false), val.value(coctx, false), expirationTime); + Object key0 = unwrap(key, coctx); + + Object val0 = unwrap(val, coctx); + + ctx.indexing().store(space, key0, val0, expirationTime); } if (idx == null) @@ -736,6 +743,13 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** + * Unwrap CacheObject if needed. + */ + private Object unwrap(CacheObject obj, CacheObjectContext coctx) { + return isIndexingSpiAllowsBinary && ctx.cacheObjects().isBinaryObject(obj) ? obj : obj.value(coctx, false); + } + + /** * @throws IgniteCheckedException If failed. */ private void checkEnabled() throws IgniteCheckedException { @@ -1025,7 +1039,9 @@ public class GridQueryProcessor extends GridProcessorAdapter { if (ctx.indexing().enabled()) { CacheObjectContext coctx = cacheObjectContext(space); - ctx.indexing().remove(space, key.value(coctx, false)); + Object key0 = unwrap(key, coctx); + + ctx.indexing().remove(space, key0); } if (idx == null) @@ -1168,11 +1184,9 @@ public class GridQueryProcessor extends GridProcessorAdapter { if (ctx.indexing().enabled()) { CacheObjectContext coctx = cacheObjectContext(spaceName); - ctx.indexing().onSwap( - spaceName, - key.value( - coctx, - false)); + Object key0 = unwrap(key, coctx); + + ctx.indexing().onSwap(spaceName, key0); } if (idx == null) @@ -1207,7 +1221,11 @@ public class GridQueryProcessor extends GridProcessorAdapter { if (ctx.indexing().enabled()) { CacheObjectContext coctx = cacheObjectContext(spaceName); - ctx.indexing().onUnswap(spaceName, key.value(coctx, false), val.value(coctx, false)); + Object key0 = unwrap(key, coctx); + + Object val0 = unwrap(val, coctx); + + ctx.indexing().onUnswap(spaceName, key0, val0); } if (idx == null) http://git-wip-us.apache.org/repos/asf/ignite/blob/708cc8c6/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingSpi.java index a3ea33e..bbe27c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingSpi.java @@ -35,6 +35,9 @@ import org.jetbrains.annotations.Nullable; * methods. Note again that calling methods from this interface on the obtained instance can lead * to undefined behavior and explicitly not supported. * + * <b>NOTE:</b> Key and value arguments of IgniteSpi methods can be {@link org.apache.ignite.binary.BinaryObject} instances. + * BinaryObjects can be deserialized manually if original objects needed. + * * Here is a Java example on how to configure SPI. * <pre name="code" class="java"> * IndexingSpi spi = new MyIndexingSpi(); http://git-wip-us.apache.org/repos/asf/ignite/blob/708cc8c6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java index 94b0c8a..f66b99e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java @@ -17,11 +17,22 @@ package org.apache.ignite.internal.processors.cache.query; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import javax.cache.Cache; import junit.framework.TestCase; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.IgniteTransactions; import org.apache.ignite.Ignition; +import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.SpiQuery; @@ -40,17 +51,9 @@ import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; import org.apache.ignite.transactions.TransactionState; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.Map; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.concurrent.Callable; -import javax.cache.Cache; - /** * Indexing Spi query test */ @@ -88,6 +91,94 @@ public class IndexingSpiQuerySelfTest extends TestCase { /** * @throws Exception If failed. */ + public void testIndexingSpiWithDisabledQueryProcessor() throws Exception { + IgniteConfiguration cfg = configuration(); + + cfg.setIndexingSpi(new MyIndexingSpi()); + + Ignite ignite = Ignition.start(cfg); + + CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>("test-cache"); + + IgniteCache<Integer, Integer> cache = ignite.createCache(ccfg); + + for (int i = 0; i < 10; i++) + cache.put(i, i); + + QueryCursor<Cache.Entry<Integer, Integer>> cursor = cache.query(new SpiQuery<Integer, Integer>().setArgs(2, 5)); + + for (Cache.Entry<Integer, Integer> entry : cursor) + System.out.println(entry); + } + + /** + * @throws Exception If failed. + */ + public void testBinaryIndexingSpi() throws Exception { + IgniteConfiguration cfg = configuration(); + + cfg.setIndexingSpi(new MyBinaryIndexingSpi()); + + Ignite ignite = Ignition.start(cfg); + + CacheConfiguration<PersonKey, Person> ccfg = new CacheConfiguration<>("test-binary-cache"); + + ccfg.setIndexedTypes(PersonKey.class, Person.class); + + IgniteCache<PersonKey, Person> cache = ignite.createCache(ccfg); + + for (int i = 0; i < 10; i++) { + PersonKey key = new PersonKey(i); + + cache.put(key, new Person("John Doe " + i)); + } + + QueryCursor<Cache.Entry<PersonKey, Person>> cursor = cache.query( + new SpiQuery<PersonKey, Person>().setArgs(new PersonKey(2), new PersonKey(5))); + + for (Cache.Entry<PersonKey, Person> entry : cursor) + System.out.println(entry); + + cache.remove(new PersonKey(9)); + } + + + /** + * @throws Exception If failed. + */ + public void testNonBinaryIndexingSpi() throws Exception { + System.setProperty(IgniteSystemProperties.IGNITE_UNWRAP_BINARY_FOR_INDEXING_SPI, "true"); + + IgniteConfiguration cfg = configuration(); + + cfg.setIndexingSpi(new MyIndexingSpi()); + + Ignite ignite = Ignition.start(cfg); + + CacheConfiguration<PersonKey, Person> ccfg = new CacheConfiguration<>("test-binary-cache"); + + ccfg.setIndexedTypes(PersonKey.class, Person.class); + + IgniteCache<PersonKey, Person> cache = ignite.createCache(ccfg); + + for (int i = 0; i < 10; i++) { + PersonKey key = new PersonKey(i); + + cache.put(key, new Person("John Doe " + i)); + } + + QueryCursor<Cache.Entry<PersonKey, Person>> cursor = cache.query( + new SpiQuery<PersonKey, Person>().setArgs(new PersonKey(2), new PersonKey(5))); + + for (Cache.Entry<PersonKey, Person> entry : cursor) + System.out.println(entry); + + cache.remove(new PersonKey(9)); + } + + /** + * @throws Exception If failed. + */ @SuppressWarnings("ThrowableResultOfMethodCallIgnored") public void testIndexingSpiFailure() throws Exception { IgniteConfiguration cfg = configuration(); @@ -173,6 +264,9 @@ public class IndexingSpiQuerySelfTest extends TestCase { Object from = paramsIt.next(); Object to = paramsIt.next(); + from = from instanceof BinaryObject ? ((BinaryObject)from).deserialize() : from; + to = to instanceof BinaryObject ? ((BinaryObject)to).deserialize() : to; + SortedMap<Object, Object> map = idx.subMap(from, to); Collection<Cache.Entry<?, ?>> res = new ArrayList<>(map.size()); @@ -186,6 +280,9 @@ public class IndexingSpiQuerySelfTest extends TestCase { /** {@inheritDoc} */ @Override public void store(@Nullable String spaceName, Object key, Object val, long expirationTime) throws IgniteSpiException { + assertFalse(key instanceof BinaryObject); + assertFalse(val instanceof BinaryObject); + idx.put(key, val); } @@ -206,13 +303,95 @@ public class IndexingSpiQuerySelfTest extends TestCase { } /** + * Indexing Spi implementation for test. Accepts binary objects only + */ + private static class MyBinaryIndexingSpi extends MyIndexingSpi { + + /** {@inheritDoc} */ + @Override public void store(@Nullable String spaceName, Object key, Object val, + long expirationTime) throws IgniteSpiException { + assertTrue(key instanceof BinaryObject); + + assertTrue(val instanceof BinaryObject); + + super.store(spaceName, ((BinaryObject)key).deserialize(), ((BinaryObject)val).deserialize(), expirationTime); + } + + /** {@inheritDoc} */ + @Override public void remove(@Nullable String spaceName, Object key) throws IgniteSpiException { + assertTrue(key instanceof BinaryObject); + } + + /** {@inheritDoc} */ + @Override public void onSwap(@Nullable String spaceName, Object key) throws IgniteSpiException { + assertTrue(key instanceof BinaryObject); + } + + /** {@inheritDoc} */ + @Override + public void onUnswap(@Nullable String spaceName, Object key, Object val) throws IgniteSpiException { + assertTrue(key instanceof BinaryObject); + + assertTrue(val instanceof BinaryObject); + } + } + + /** * Broken Indexing Spi implementation for test */ - private class MyBrokenIndexingSpi extends MyIndexingSpi { + private static class MyBrokenIndexingSpi extends MyIndexingSpi { /** {@inheritDoc} */ @Override public void store(@Nullable String spaceName, Object key, Object val, long expirationTime) throws IgniteSpiException { throw new IgniteSpiException("Test exception"); } } + + /** + * + */ + private static class PersonKey implements Serializable, Comparable<PersonKey> { + /** */ + private int id; + + /** */ + public PersonKey(int id) { + this.id = id; + } + + /** {@inheritDoc} */ + @Override public int compareTo(@NotNull PersonKey o) { + return Integer.compare(id, o.id); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + PersonKey key = (PersonKey)o; + + return id == key.id; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id; + } + } + + /** + * + */ + private static class Person implements Serializable { + /** */ + private String name; + + /** */ + Person(String name) { + this.name = name; + } + } } \ No newline at end of file
