Repository: ignite Updated Branches: refs/heads/master f4c18f114 -> 9bfc823e0
IGNITE-9054 Avoid using OptimizedMarshaller with initial ScanQuery. - Fixes #4592. Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9bfc823e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9bfc823e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9bfc823e Branch: refs/heads/master Commit: 9bfc823e012b052565a32ab1c82118fe0aff950f Parents: f4c18f1 Author: Ilya Kasnacheev <[email protected]> Authored: Tue Sep 4 17:26:58 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Tue Sep 4 18:17:27 2018 +0300 ---------------------------------------------------------------------- .../cache/query/GridCacheQueryManager.java | 4 +- .../query/GridCacheQueryResponseEntry.java | 5 +- .../ignite/custom/DummyEventFilterFactory.java | 10 +- .../ContinuousQueryMarshallerTest.java | 168 +++++++++++++++++++ .../ContinuousQueryPeerClassLoadingTest.java | 4 +- .../IgniteCacheQuerySelfTestSuite3.java | 2 + 6 files changed, 183 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9bfc823e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 281400e..982006f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -1278,7 +1278,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte continue; } else - data.add(!loc ? new GridCacheQueryResponseEntry<>(key, val) : F.t(key, val)); + data.add(new T2<>(key, val)); } if (!loc) { @@ -3119,7 +3119,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } } else - next0 = !locNode ? new GridCacheQueryResponseEntry<>(key0, val0): + next0 = !locNode ? new T2<>(key0, val0): new CacheQueryEntry<>(key0, val0); break; http://git-wip-us.apache.org/repos/asf/ignite/blob/9bfc823e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponseEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponseEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponseEntry.java index 2c1a4f5..650f0c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponseEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponseEntry.java @@ -27,7 +27,10 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude; /** * Class to store query results returned by remote nodes. It's required to fully * control serialization process. Local entries can be returned to user as is. + * <p> + * @deprecated Should be removed in Apache Ignite 3.0. */ +@Deprecated public class GridCacheQueryResponseEntry<K, V> implements Map.Entry<K, V>, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -113,4 +116,4 @@ public class GridCacheQueryResponseEntry<K, V> implements Map.Entry<K, V>, Exter @Override public String toString() { return "[" + key + "=" + val + "]"; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9bfc823e/modules/core/src/test/java/org/apache/ignite/custom/DummyEventFilterFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/custom/DummyEventFilterFactory.java b/modules/core/src/test/java/org/apache/ignite/custom/DummyEventFilterFactory.java index e0688bc..103e6a8 100644 --- a/modules/core/src/test/java/org/apache/ignite/custom/DummyEventFilterFactory.java +++ b/modules/core/src/test/java/org/apache/ignite/custom/DummyEventFilterFactory.java @@ -25,22 +25,22 @@ import javax.cache.event.CacheEntryListenerException; /** * Must be not in org.apache.ignite.internal */ -public class DummyEventFilterFactory implements Factory<CacheEntryEventFilter<Integer, String>> { +public class DummyEventFilterFactory<T> implements Factory<CacheEntryEventFilter<Integer, T>> { /** */ private static final long serialVersionUID = 0L; /** {@inheritDoc} */ - @Override public CacheEntryEventFilter<Integer, String> create() { - return new DummyEventFilter(); + @Override public CacheEntryEventFilter<Integer, T> create() { + return new DummyEventFilter<T>(); } /** * */ - private static class DummyEventFilter implements CacheEntryEventFilter<Integer, String> { + private static class DummyEventFilter<T> implements CacheEntryEventFilter<Integer, T> { /** {@inheritDoc} */ @Override public boolean evaluate( - final CacheEntryEvent<? extends Integer, ? extends String> evt) throws CacheEntryListenerException { + final CacheEntryEvent<? extends Integer, ? extends T> evt) throws CacheEntryListenerException { return true; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9bfc823e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryMarshallerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryMarshallerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryMarshallerTest.java new file mode 100644 index 0000000..44dcc1c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryMarshallerTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.ObjectStreamException; +import java.io.Serializable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import javax.cache.Cache; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.binary.BinaryReader; +import org.apache.ignite.binary.BinaryWriter; +import org.apache.ignite.binary.Binarylizable; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.custom.DummyEventFilterFactory; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Checks that Optimized Marshaller is not used on any stage of Continuous Query handling. + */ +public class ContinuousQueryMarshallerTest extends GridCommonAbstractTest { + /** */ + public static final String CACHE_NAME = "test-cache"; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(final String gridName) throws Exception { + final IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setClientMode(gridName.contains("client")); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testRemoteFilterFactoryClient() throws Exception { + check("server", "client"); + } + + /** + * @throws Exception If failed. + */ + public void testRemoteFilterFactoryServer() throws Exception { + check("server1", "server2"); + } + + /** + * @param node1Name Node 1 name. + * @param node2Name Node 2 name. + */ + private void check(String node1Name, String node2Name) throws Exception { + final Ignite node1 = startGrid(node1Name); + + final IgniteCache<Integer, MarshallerCheckingEntry> cache = node1.getOrCreateCache(CACHE_NAME); + + for (int i = 0; i < 10; i++) + cache.put(i, new MarshallerCheckingEntry(String.valueOf(i))); + + final Ignite node2 = startGrid(node2Name); + + final ContinuousQuery<Integer, MarshallerCheckingEntry> qry = new ContinuousQuery<>(); + + ScanQuery<Integer, MarshallerCheckingEntry> scanQry = new ScanQuery<>(new IgniteBiPredicate<Integer, MarshallerCheckingEntry>() { + @Override public boolean apply(Integer key, MarshallerCheckingEntry val) { + return key % 2 == 0; + } + }); + + qry.setInitialQuery(scanQry); + + qry.setRemoteFilterFactory(new DummyEventFilterFactory<>()); + + final CountDownLatch latch = new CountDownLatch(15); + + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, MarshallerCheckingEntry>() { + @Override public void onUpdated( + final Iterable<CacheEntryEvent<? extends Integer, ? extends MarshallerCheckingEntry>> evts) + throws CacheEntryListenerException { + + System.out.println(">> Client 1 events " + evts); + + for (CacheEntryEvent<? extends Integer, ? extends MarshallerCheckingEntry> evt : evts) + latch.countDown(); + } + }); + + final IgniteCache<Integer, MarshallerCheckingEntry> cache1 = node2.cache(CACHE_NAME); + + for (Cache.Entry<Integer, MarshallerCheckingEntry> entry : cache1.query(qry)) { + latch.countDown(); + System.out.println(">> Initial entry " + entry); + } + + for (int i = 10; i < 20; i++) + cache1.put(i, new MarshallerCheckingEntry(i)); + + assertTrue(Long.toString(latch.getCount()), latch.await(5, TimeUnit.SECONDS)); + } + + /** Checks that OptimizedMarshaller is never used (BinaryMarshaller is OK) */ + private class MarshallerCheckingEntry implements Serializable, Binarylizable { + /** */ + private Object val; + + /** */ + public MarshallerCheckingEntry(Object val) { + this.val = val; + } + + /** */ + private void writeObject(ObjectOutputStream out) throws IOException { + throw new UnsupportedOperationException(); + } + + /** */ + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + throw new UnsupportedOperationException(); + } + + /** */ + private void readObjectNoData() throws ObjectStreamException { + throw new UnsupportedOperationException(); + } + + /** */ + @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException { + writer.writeObject("value", val); + } + + /** */ + @Override public void readBinary(BinaryReader reader) throws BinaryObjectException { + val = reader.readObject("value"); + } + } + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9bfc823e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryPeerClassLoadingTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryPeerClassLoadingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryPeerClassLoadingTest.java index e5d1d60..73d8d0d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryPeerClassLoadingTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryPeerClassLoadingTest.java @@ -91,8 +91,8 @@ public class ContinuousQueryPeerClassLoadingTest extends GridCommonAbstractTest final ContinuousQuery<Integer, String> qry1 = new ContinuousQuery<>(); final ContinuousQuery<Integer, String> qry2 = new ContinuousQuery<>(); - qry1.setRemoteFilterFactory(new DummyEventFilterFactory()); - qry2.setRemoteFilterFactory(new DummyEventFilterFactory()); + qry1.setRemoteFilterFactory(new DummyEventFilterFactory<>()); + qry2.setRemoteFilterFactory(new DummyEventFilterFactory<>()); final AtomicInteger client1Evts = new AtomicInteger(0); final AtomicInteger client2Evts = new AtomicInteger(0); http://git-wip-us.apache.org/repos/asf/ignite/blob/9bfc823e/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java index e810d30..08511d9 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java @@ -47,6 +47,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBin import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationStoreEnabledTest; import org.apache.ignite.internal.processors.cache.query.continuous.ClientReconnectContinuousQueryTest; +import org.apache.ignite.internal.processors.cache.query.continuous.ContinuousQueryMarshallerTest; import org.apache.ignite.internal.processors.cache.query.continuous.ContinuousQueryPeerClassLoadingTest; import org.apache.ignite.internal.processors.cache.query.continuous.ContinuousQueryRemoteFilterMissingInClassPathSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest; @@ -132,6 +133,7 @@ public class IgniteCacheQuerySelfTestSuite3 extends TestSuite { suite.addTestSuite(ClientReconnectContinuousQueryTest.class); suite.addTestSuite(ContinuousQueryPeerClassLoadingTest.class); suite.addTestSuite(ClientReconnectContinuousQueryTest.class); + suite.addTestSuite(ContinuousQueryMarshallerTest.class); suite.addTestSuite(CacheContinuousQueryConcurrentPartitionUpdateTest.class); suite.addTestSuite(CacheContinuousQueryEventBufferTest.class);
