Repository: ignite
Updated Branches:
  refs/heads/master f4c18f114 -> 9bfc823e0


IGNITE-9054 Avoid using OptimizedMarshaller with initial ScanQuery. - Fixes 
#4592.

Signed-off-by: Alexey Goncharuk <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9bfc823e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9bfc823e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9bfc823e

Branch: refs/heads/master
Commit: 9bfc823e012b052565a32ab1c82118fe0aff950f
Parents: f4c18f1
Author: Ilya Kasnacheev <[email protected]>
Authored: Tue Sep 4 17:26:58 2018 +0300
Committer: Alexey Goncharuk <[email protected]>
Committed: Tue Sep 4 18:17:27 2018 +0300

----------------------------------------------------------------------
 .../cache/query/GridCacheQueryManager.java      |   4 +-
 .../query/GridCacheQueryResponseEntry.java      |   5 +-
 .../ignite/custom/DummyEventFilterFactory.java  |  10 +-
 .../ContinuousQueryMarshallerTest.java          | 168 +++++++++++++++++++
 .../ContinuousQueryPeerClassLoadingTest.java    |   4 +-
 .../IgniteCacheQuerySelfTestSuite3.java         |   2 +
 6 files changed, 183 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9bfc823e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 281400e..982006f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -1278,7 +1278,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
                                 continue;
                         }
                         else
-                            data.add(!loc ? new 
GridCacheQueryResponseEntry<>(key, val) : F.t(key, val));
+                            data.add(new T2<>(key, val));
                     }
 
                     if (!loc) {
@@ -3119,7 +3119,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
                             }
                         }
                         else
-                            next0 = !locNode ? new 
GridCacheQueryResponseEntry<>(key0, val0):
+                            next0 = !locNode ? new T2<>(key0, val0):
                                 new CacheQueryEntry<>(key0, val0);
 
                         break;

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bfc823e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponseEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponseEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponseEntry.java
index 2c1a4f5..650f0c0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponseEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponseEntry.java
@@ -27,7 +27,10 @@ import 
org.apache.ignite.internal.util.tostring.GridToStringInclude;
 /**
  * Class to store query results returned by remote nodes. It's required to 
fully
  * control serialization process. Local entries can be returned to user as is.
+ * <p>
+ * @deprecated Should be removed in Apache Ignite 3.0.
  */
+@Deprecated
 public class GridCacheQueryResponseEntry<K, V> implements Map.Entry<K, V>, 
Externalizable {
     /** */
     private static final long serialVersionUID = 0L;
@@ -113,4 +116,4 @@ public class GridCacheQueryResponseEntry<K, V> implements 
Map.Entry<K, V>, Exter
     @Override public String toString() {
         return "[" + key + "=" + val + "]";
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bfc823e/modules/core/src/test/java/org/apache/ignite/custom/DummyEventFilterFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/custom/DummyEventFilterFactory.java
 
b/modules/core/src/test/java/org/apache/ignite/custom/DummyEventFilterFactory.java
index e0688bc..103e6a8 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/custom/DummyEventFilterFactory.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/custom/DummyEventFilterFactory.java
@@ -25,22 +25,22 @@ import javax.cache.event.CacheEntryListenerException;
 /**
  * Must be not in org.apache.ignite.internal
  */
-public class DummyEventFilterFactory implements 
Factory<CacheEntryEventFilter<Integer, String>> {
+public class DummyEventFilterFactory<T> implements 
Factory<CacheEntryEventFilter<Integer, T>> {
     /** */
     private static final long serialVersionUID = 0L;
 
     /** {@inheritDoc} */
-    @Override public CacheEntryEventFilter<Integer, String> create() {
-        return new DummyEventFilter();
+    @Override public CacheEntryEventFilter<Integer, T> create() {
+        return new DummyEventFilter<T>();
     }
 
     /**
      *
      */
-    private static class DummyEventFilter implements 
CacheEntryEventFilter<Integer, String> {
+    private static class DummyEventFilter<T> implements 
CacheEntryEventFilter<Integer, T> {
         /** {@inheritDoc} */
         @Override public boolean evaluate(
-            final CacheEntryEvent<? extends Integer, ? extends String> evt) 
throws CacheEntryListenerException {
+            final CacheEntryEvent<? extends Integer, ? extends T> evt) throws 
CacheEntryListenerException {
             return true;
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bfc823e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryMarshallerTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryMarshallerTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryMarshallerTest.java
new file mode 100644
index 0000000..44dcc1c
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryMarshallerTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.ObjectStreamException;
+import java.io.Serializable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.cache.Cache;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryReader;
+import org.apache.ignite.binary.BinaryWriter;
+import org.apache.ignite.binary.Binarylizable;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.custom.DummyEventFilterFactory;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Checks that Optimized Marshaller is not used on any stage of Continuous 
Query handling.
+ */
+public class ContinuousQueryMarshallerTest extends GridCommonAbstractTest {
+    /** */
+    public static final String CACHE_NAME = "test-cache";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(final String 
gridName) throws Exception {
+        final IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setClientMode(gridName.contains("client"));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRemoteFilterFactoryClient() throws Exception {
+        check("server", "client");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRemoteFilterFactoryServer() throws Exception {
+        check("server1", "server2");
+    }
+
+    /**
+     * @param node1Name Node 1 name.
+     * @param node2Name Node 2 name.
+     */
+    private void check(String node1Name, String node2Name) throws Exception {
+        final Ignite node1 = startGrid(node1Name);
+
+        final IgniteCache<Integer, MarshallerCheckingEntry> cache = 
node1.getOrCreateCache(CACHE_NAME);
+
+        for (int i = 0; i < 10; i++)
+            cache.put(i, new MarshallerCheckingEntry(String.valueOf(i)));
+
+        final Ignite node2 = startGrid(node2Name);
+
+        final ContinuousQuery<Integer, MarshallerCheckingEntry> qry = new 
ContinuousQuery<>();
+
+        ScanQuery<Integer, MarshallerCheckingEntry> scanQry = new 
ScanQuery<>(new IgniteBiPredicate<Integer, MarshallerCheckingEntry>() {
+            @Override public boolean apply(Integer key, 
MarshallerCheckingEntry val) {
+                return key % 2 == 0;
+            }
+        });
+
+        qry.setInitialQuery(scanQry);
+
+        qry.setRemoteFilterFactory(new DummyEventFilterFactory<>());
+
+        final CountDownLatch latch = new CountDownLatch(15);
+
+        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, 
MarshallerCheckingEntry>() {
+            @Override public void onUpdated(
+                final Iterable<CacheEntryEvent<? extends Integer, ? extends 
MarshallerCheckingEntry>> evts)
+                throws CacheEntryListenerException {
+
+                System.out.println(">> Client 1 events " + evts);
+
+                for (CacheEntryEvent<? extends Integer, ? extends 
MarshallerCheckingEntry> evt : evts)
+                    latch.countDown();
+            }
+        });
+
+        final IgniteCache<Integer, MarshallerCheckingEntry> cache1 = 
node2.cache(CACHE_NAME);
+
+        for (Cache.Entry<Integer, MarshallerCheckingEntry> entry : 
cache1.query(qry)) {
+            latch.countDown();
+            System.out.println(">> Initial entry " + entry);
+        }
+
+        for (int i = 10; i < 20; i++)
+            cache1.put(i, new MarshallerCheckingEntry(i));
+
+        assertTrue(Long.toString(latch.getCount()), latch.await(5, 
TimeUnit.SECONDS));
+    }
+
+    /** Checks that OptimizedMarshaller is never used (BinaryMarshaller is OK) 
*/
+    private class MarshallerCheckingEntry implements Serializable, 
Binarylizable {
+        /** */
+        private Object val;
+
+        /** */
+        public MarshallerCheckingEntry(Object val) {
+            this.val = val;
+        }
+
+        /** */
+        private void writeObject(ObjectOutputStream out) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        /** */
+        private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+            throw new UnsupportedOperationException();
+        }
+
+        /** */
+        private void readObjectNoData() throws ObjectStreamException {
+            throw new UnsupportedOperationException();
+        }
+
+        /** */
+        @Override public void writeBinary(BinaryWriter writer) throws 
BinaryObjectException {
+            writer.writeObject("value", val);
+        }
+
+        /** */
+        @Override public void readBinary(BinaryReader reader) throws 
BinaryObjectException {
+            val = reader.readObject("value");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bfc823e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryPeerClassLoadingTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryPeerClassLoadingTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryPeerClassLoadingTest.java
index e5d1d60..73d8d0d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryPeerClassLoadingTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryPeerClassLoadingTest.java
@@ -91,8 +91,8 @@ public class ContinuousQueryPeerClassLoadingTest extends 
GridCommonAbstractTest
         final ContinuousQuery<Integer, String> qry1 = new ContinuousQuery<>();
         final ContinuousQuery<Integer, String> qry2 = new ContinuousQuery<>();
 
-        qry1.setRemoteFilterFactory(new DummyEventFilterFactory());
-        qry2.setRemoteFilterFactory(new DummyEventFilterFactory());
+        qry1.setRemoteFilterFactory(new DummyEventFilterFactory<>());
+        qry2.setRemoteFilterFactory(new DummyEventFilterFactory<>());
 
         final AtomicInteger client1Evts = new AtomicInteger(0);
         final AtomicInteger client2Evts = new AtomicInteger(0);

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bfc823e/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
index e810d30..08511d9 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
@@ -47,6 +47,7 @@ import 
org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBin
 import 
org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationTest;
 import 
org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationStoreEnabledTest;
 import 
org.apache.ignite.internal.processors.cache.query.continuous.ClientReconnectContinuousQueryTest;
+import 
org.apache.ignite.internal.processors.cache.query.continuous.ContinuousQueryMarshallerTest;
 import 
org.apache.ignite.internal.processors.cache.query.continuous.ContinuousQueryPeerClassLoadingTest;
 import 
org.apache.ignite.internal.processors.cache.query.continuous.ContinuousQueryRemoteFilterMissingInClassPathSelfTest;
 import 
org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest;
@@ -132,6 +133,7 @@ public class IgniteCacheQuerySelfTestSuite3 extends 
TestSuite {
         suite.addTestSuite(ClientReconnectContinuousQueryTest.class);
         suite.addTestSuite(ContinuousQueryPeerClassLoadingTest.class);
         suite.addTestSuite(ClientReconnectContinuousQueryTest.class);
+        suite.addTestSuite(ContinuousQueryMarshallerTest.class);
 
         
suite.addTestSuite(CacheContinuousQueryConcurrentPartitionUpdateTest.class);
         suite.addTestSuite(CacheContinuousQueryEventBufferTest.class);

Reply via email to