Repository: ignite
Updated Branches:
  refs/heads/master 414cc58be -> 046657baf


IGNITE-2865 Continuous query event passed to filter should be immutable for 
users. This closes #744.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/046657ba
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/046657ba
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/046657ba

Branch: refs/heads/master
Commit: 046657baf65a9bf3dfdf29a66145f9ad2c5b3698
Parents: 414cc58
Author: sboikov <[email protected]>
Authored: Tue May 24 14:31:43 2016 +0300
Committer: sboikov <[email protected]>
Committed: Tue May 24 14:31:43 2016 +0300

----------------------------------------------------------------------
 .../ignite/codegen/MessageCodeGenerator.java    | 133 ++++++------
 .../ignite/internal/GridCodegenConverter.java   |  56 +++++
 .../continuous/CacheContinuousQueryEntry.java   |  24 ++-
 ...eCacheContinuousQueryImmutableEntryTest.java | 205 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite3.java         |   2 +
 5 files changed, 351 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/046657ba/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git 
a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
 
b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index 587ad06..a6ae0da 100644
--- 
a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ 
b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -39,6 +39,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.UUID;
+import org.apache.ignite.internal.GridCodegenConverter;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
@@ -544,8 +545,14 @@ public class MessageCodeGenerator {
 
         indent++;
 
-        returnFalseIfWriteFailed(field.getType(), field.getName(), colAnn != 
null ? colAnn.value() : null,
-            mapAnn != null ? mapAnn.keyType() : null, mapAnn != null ? 
mapAnn.valueType() : null, false);
+        GridCodegenConverter fldPreproc = 
field.getAnnotation(GridCodegenConverter.class);
+
+        String getExp = (fldPreproc != null && !fldPreproc.get().isEmpty())? 
fldPreproc.get(): field.getName();
+        Class<?> writeType = (fldPreproc != null && 
!fldPreproc.type().equals(GridCodegenConverter.Default.class))?
+            fldPreproc.type(): field.getType();
+
+        returnFalseIfWriteFailed(writeType, field.getName(), colAnn != null ? 
colAnn.value() : null,
+            mapAnn != null ? mapAnn.keyType() : null, mapAnn != null ? 
mapAnn.valueType() : null, false, getExp);
 
         write.add(EMPTY);
         write.add(builder().a("writer.incrementState();").toString());
@@ -569,8 +576,13 @@ public class MessageCodeGenerator {
 
         indent++;
 
-        returnFalseIfReadFailed(field.getType(), field.getName(), colAnn != 
null ? colAnn.value() : null,
-            mapAnn != null ? mapAnn.keyType() : null, mapAnn != null ? 
mapAnn.valueType() : null);
+        GridCodegenConverter fldPreproc = 
field.getAnnotation(GridCodegenConverter.class);
+        String setExp = (fldPreproc != null && !fldPreproc.get().isEmpty())? 
fldPreproc.set(): "";
+        Class<?> writeType = (fldPreproc != null && 
!fldPreproc.type().equals(GridCodegenConverter.Default.class))?
+            fldPreproc.type(): field.getType();
+
+        returnFalseIfReadFailed(writeType, field.getName(), colAnn != null ? 
colAnn.value() : null,
+            mapAnn != null ? mapAnn.keyType() : null, mapAnn != null ? 
mapAnn.valueType() : null, setExp);
 
         read.add(EMPTY);
         read.add(builder().a("reader.incrementState();").toString());
@@ -588,74 +600,74 @@ public class MessageCodeGenerator {
      * @param raw Raw write flag.
      */
     private void returnFalseIfWriteFailed(Class<?> type, String name, 
@Nullable Class<?> colItemType,
-        @Nullable Class<?> mapKeyType, @Nullable Class<?> mapValType, boolean 
raw) {
+        @Nullable Class<?> mapKeyType, @Nullable Class<?> mapValType, boolean 
raw, String getExpr) {
         assert type != null;
         assert name != null;
 
         String field = raw ? "null" : '"' + name + '"';
 
         if (type == byte.class)
-            returnFalseIfFailed(write, "writer.writeByte", field, name);
+            returnFalseIfFailed(write, "writer.writeByte", field, getExpr);
         else if (type == short.class)
-            returnFalseIfFailed(write, "writer.writeShort", field, name);
+            returnFalseIfFailed(write, "writer.writeShort", field, getExpr);
         else if (type == int.class)
-            returnFalseIfFailed(write, "writer.writeInt", field, name);
+            returnFalseIfFailed(write, "writer.writeInt", field, getExpr);
         else if (type == long.class)
-            returnFalseIfFailed(write, "writer.writeLong", field, name);
+            returnFalseIfFailed(write, "writer.writeLong", field, getExpr);
         else if (type == float.class)
-            returnFalseIfFailed(write, "writer.writeFloat", field, name);
+            returnFalseIfFailed(write, "writer.writeFloat", field, getExpr);
         else if (type == double.class)
-            returnFalseIfFailed(write, "writer.writeDouble", field, name);
+            returnFalseIfFailed(write, "writer.writeDouble", field, getExpr);
         else if (type == char.class)
-            returnFalseIfFailed(write, "writer.writeChar", field, name);
+            returnFalseIfFailed(write, "writer.writeChar", field, getExpr);
         else if (type == boolean.class)
-            returnFalseIfFailed(write, "writer.writeBoolean", field, name);
+            returnFalseIfFailed(write, "writer.writeBoolean", field, getExpr);
         else if (type == byte[].class)
-            returnFalseIfFailed(write, "writer.writeByteArray", field, name);
+            returnFalseIfFailed(write, "writer.writeByteArray", field, 
getExpr);
         else if (type == short[].class)
-            returnFalseIfFailed(write, "writer.writeShortArray", field, name);
+            returnFalseIfFailed(write, "writer.writeShortArray", field, 
getExpr);
         else if (type == int[].class)
-            returnFalseIfFailed(write, "writer.writeIntArray", field, name);
+            returnFalseIfFailed(write, "writer.writeIntArray", field, getExpr);
         else if (type == long[].class)
-            returnFalseIfFailed(write, "writer.writeLongArray", field, name);
+            returnFalseIfFailed(write, "writer.writeLongArray", field, 
getExpr);
         else if (type == float[].class)
-            returnFalseIfFailed(write, "writer.writeFloatArray", field, name);
+            returnFalseIfFailed(write, "writer.writeFloatArray", field, 
getExpr);
         else if (type == double[].class)
-            returnFalseIfFailed(write, "writer.writeDoubleArray", field, name);
+            returnFalseIfFailed(write, "writer.writeDoubleArray", field, 
getExpr);
         else if (type == char[].class)
-            returnFalseIfFailed(write, "writer.writeCharArray", field, name);
+            returnFalseIfFailed(write, "writer.writeCharArray", field, 
getExpr);
         else if (type == boolean[].class)
-            returnFalseIfFailed(write, "writer.writeBooleanArray", field, 
name);
+            returnFalseIfFailed(write, "writer.writeBooleanArray", field, 
getExpr);
         else if (type == String.class)
-            returnFalseIfFailed(write, "writer.writeString", field, name);
+            returnFalseIfFailed(write, "writer.writeString", field, getExpr);
         else if (type == BitSet.class)
-            returnFalseIfFailed(write, "writer.writeBitSet", field, name);
+            returnFalseIfFailed(write, "writer.writeBitSet", field, getExpr);
         else if (type == UUID.class)
-            returnFalseIfFailed(write, "writer.writeUuid", field, name);
+            returnFalseIfFailed(write, "writer.writeUuid", field, getExpr);
         else if (type == IgniteUuid.class)
-            returnFalseIfFailed(write, "writer.writeIgniteUuid", field, name);
+            returnFalseIfFailed(write, "writer.writeIgniteUuid", field, 
getExpr);
         else if (type.isEnum()) {
-            String arg = name + " != null ? (byte)" + name + ".ordinal() : -1";
+            String arg = getExpr + " != null ? (byte)" + getExpr + ".ordinal() 
: -1";
 
             returnFalseIfFailed(write, "writer.writeByte", field, arg);
         }
         else if (BASE_CLS.isAssignableFrom(type))
-            returnFalseIfFailed(write, "writer.writeMessage", field, name);
+            returnFalseIfFailed(write, "writer.writeMessage", field, getExpr);
         else if (type.isArray()) {
-            returnFalseIfFailed(write, "writer.writeObjectArray", field, name,
+            returnFalseIfFailed(write, "writer.writeObjectArray", field, 
getExpr,
                 "MessageCollectionItemType." + 
typeEnum(type.getComponentType()));
         }
         else if (Collection.class.isAssignableFrom(type) && 
!Set.class.isAssignableFrom(type)) {
             assert colItemType != null;
 
-            returnFalseIfFailed(write, "writer.writeCollection", field, name,
+            returnFalseIfFailed(write, "writer.writeCollection", field, 
getExpr,
                 "MessageCollectionItemType." + typeEnum(colItemType));
         }
         else if (Map.class.isAssignableFrom(type)) {
             assert mapKeyType != null;
             assert mapValType != null;
 
-            returnFalseIfFailed(write, "writer.writeMap", field, name,
+            returnFalseIfFailed(write, "writer.writeMap", field, getExpr,
                 "MessageCollectionItemType." + typeEnum(mapKeyType),
                 "MessageCollectionItemType." + typeEnum(mapValType));
         }
@@ -671,75 +683,75 @@ public class MessageCodeGenerator {
      * @param mapValType Map value type.
      */
     private void returnFalseIfReadFailed(Class<?> type, @Nullable String name, 
@Nullable Class<?> colItemType,
-        @Nullable Class<?> mapKeyType, @Nullable Class<?> mapValType) {
+        @Nullable Class<?> mapKeyType, @Nullable Class<?> mapValType, String 
setExpr) {
         assert type != null;
 
         String field = '"' + name + '"';
 
         if (type == byte.class)
-            returnFalseIfReadFailed(name, "reader.readByte", field);
+            returnFalseIfReadFailed(name, "reader.readByte", setExpr, field);
         else if (type == short.class)
-            returnFalseIfReadFailed(name, "reader.readShort", field);
+            returnFalseIfReadFailed(name, "reader.readShort", setExpr, field);
         else if (type == int.class)
-            returnFalseIfReadFailed(name, "reader.readInt", field);
+            returnFalseIfReadFailed(name, "reader.readInt", setExpr, field);
         else if (type == long.class)
-            returnFalseIfReadFailed(name, "reader.readLong", field);
+            returnFalseIfReadFailed(name, "reader.readLong", setExpr, field);
         else if (type == float.class)
-            returnFalseIfReadFailed(name, "reader.readFloat", field);
+            returnFalseIfReadFailed(name, "reader.readFloat", setExpr, field);
         else if (type == double.class)
-            returnFalseIfReadFailed(name, "reader.readDouble", field);
+            returnFalseIfReadFailed(name, "reader.readDouble", setExpr, field);
         else if (type == char.class)
-            returnFalseIfReadFailed(name, "reader.readChar", field);
+            returnFalseIfReadFailed(name, "reader.readChar", setExpr, field);
         else if (type == boolean.class)
-            returnFalseIfReadFailed(name, "reader.readBoolean", field);
+            returnFalseIfReadFailed(name, "reader.readBoolean", setExpr, 
field);
         else if (type == byte[].class)
-            returnFalseIfReadFailed(name, "reader.readByteArray", field);
+            returnFalseIfReadFailed(name, "reader.readByteArray", setExpr, 
field);
         else if (type == short[].class)
-            returnFalseIfReadFailed(name, "reader.readShortArray", field);
+            returnFalseIfReadFailed(name, "reader.readShortArray", setExpr, 
field);
         else if (type == int[].class)
-            returnFalseIfReadFailed(name, "reader.readIntArray", field);
+            returnFalseIfReadFailed(name, "reader.readIntArray", setExpr, 
field);
         else if (type == long[].class)
-            returnFalseIfReadFailed(name, "reader.readLongArray", field);
+            returnFalseIfReadFailed(name, "reader.readLongArray", setExpr, 
field);
         else if (type == float[].class)
-            returnFalseIfReadFailed(name, "reader.readFloatArray", field);
+            returnFalseIfReadFailed(name, "reader.readFloatArray", setExpr, 
field);
         else if (type == double[].class)
-            returnFalseIfReadFailed(name, "reader.readDoubleArray", field);
+            returnFalseIfReadFailed(name, "reader.readDoubleArray", setExpr, 
field);
         else if (type == char[].class)
-            returnFalseIfReadFailed(name, "reader.readCharArray", field);
+            returnFalseIfReadFailed(name, "reader.readCharArray", setExpr, 
field);
         else if (type == boolean[].class)
-            returnFalseIfReadFailed(name, "reader.readBooleanArray", field);
+            returnFalseIfReadFailed(name, "reader.readBooleanArray", setExpr, 
field);
         else if (type == String.class)
-            returnFalseIfReadFailed(name, "reader.readString", field);
+            returnFalseIfReadFailed(name, "reader.readString", setExpr, field);
         else if (type == BitSet.class)
-            returnFalseIfReadFailed(name, "reader.readBitSet", field);
+            returnFalseIfReadFailed(name, "reader.readBitSet", setExpr, field);
         else if (type == UUID.class)
-            returnFalseIfReadFailed(name, "reader.readUuid", field);
+            returnFalseIfReadFailed(name, "reader.readUuid", setExpr, field);
         else if (type == IgniteUuid.class)
-            returnFalseIfReadFailed(name, "reader.readIgniteUuid", field);
+            returnFalseIfReadFailed(name, "reader.readIgniteUuid", setExpr, 
field);
         else if (type.isEnum()) {
             String loc = name + "Ord";
 
             read.add(builder().a("byte ").a(loc).a(";").toString());
             read.add(EMPTY);
 
-            returnFalseIfReadFailed(loc, "reader.readByte", field);
+            returnFalseIfReadFailed(loc, "reader.readByte", setExpr, field);
 
             read.add(EMPTY);
             read.add(builder().a(name).a(" = 
").a(type.getSimpleName()).a(".fromOrdinal(").a(loc).a(");").toString());
         }
         else if (BASE_CLS.isAssignableFrom(type))
-            returnFalseIfReadFailed(name, "reader.readMessage", field);
+            returnFalseIfReadFailed(name, "reader.readMessage", setExpr, 
field);
         else if (type.isArray()) {
             Class<?> compType = type.getComponentType();
 
-            returnFalseIfReadFailed(name, "reader.readObjectArray", field,
+            returnFalseIfReadFailed(name, "reader.readObjectArray", field, 
setExpr,
                 "MessageCollectionItemType." + typeEnum(compType),
                 compType.getSimpleName() + ".class");
         }
         else if (Collection.class.isAssignableFrom(type) && 
!Set.class.isAssignableFrom(type)) {
             assert colItemType != null;
 
-            returnFalseIfReadFailed(name, "reader.readCollection", field,
+            returnFalseIfReadFailed(name, "reader.readCollection", field, 
setExpr,
                 "MessageCollectionItemType." + typeEnum(colItemType));
         }
         else if (Map.class.isAssignableFrom(type)) {
@@ -748,7 +760,7 @@ public class MessageCodeGenerator {
 
             boolean linked = type.equals(LinkedHashMap.class);
 
-            returnFalseIfReadFailed(name, "reader.readMap", field,
+            returnFalseIfReadFailed(name, "reader.readMap", field, setExpr,
                 "MessageCollectionItemType." + typeEnum(mapKeyType),
                 "MessageCollectionItemType." + typeEnum(mapValType),
                 linked ? "true" : "false");
@@ -762,7 +774,7 @@ public class MessageCodeGenerator {
      * @param mtd Method name.
      * @param args Method arguments.
      */
-    private void returnFalseIfReadFailed(String var, String mtd, @Nullable 
String... args) {
+    private void returnFalseIfReadFailed(String var, String mtd, String 
setConverter, @Nullable String... args) {
         assert mtd != null;
 
         String argsStr = "";
@@ -774,7 +786,12 @@ public class MessageCodeGenerator {
             argsStr = argsStr.substring(0, argsStr.length() - 2);
         }
 
-        read.add(builder().a(var).a(" = 
").a(mtd).a("(").a(argsStr).a(");").toString());
+        if (setConverter.isEmpty())
+            read.add(builder().a(var).a(" = 
").a(mtd).a("(").a(argsStr).a(");").toString());
+        else {
+            read.add(builder().a(var).a(" = ").a(setConverter
+                .replace("$val$", new 
SB().a(mtd).a("(").a(argsStr).a(")").toString())).a(";").toString());
+        }
         read.add(EMPTY);
 
         read.add(builder().a("if (!reader.isLastRead())").toString());

http://git-wip-us.apache.org/repos/asf/ignite/blob/046657ba/modules/core/src/main/java/org/apache/ignite/internal/GridCodegenConverter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridCodegenConverter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/GridCodegenConverter.java
new file mode 100644
index 0000000..7baccda
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridCodegenConverter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotates fields that required custom reading / writing.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.FIELD)
+public @interface GridCodegenConverter {
+    /**
+     * Converter code. Replace original field value by converted.
+     *
+     * @return Code to get field value.
+     */
+    String get() default "";
+
+    /**
+     * Converter code. A raw read value is available at $val$ placeholder
+     *
+     * @return Code to set field value.
+     */
+    String set() default "";
+
+    /**
+     * @return Result type. Use this type to read / write the converted value.
+     */
+    Class<?> type() default Default.class;
+
+    /**
+     * Used as the default value for converter type
+     */
+    class Default {
+        // No-op.
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/046657ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index d105271..63dc4cb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -20,6 +20,7 @@ package 
org.apache.ignite.internal.processors.cache.query.continuous;
 import java.nio.ByteBuffer;
 import javax.cache.event.EventType;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridCodegenConverter;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -61,18 +62,26 @@ public class CacheContinuousQueryEntry implements 
GridCacheDeployable, Message {
     }
 
     /** */
+    @GridCodegenConverter(
+        type = byte.class,
+        get = "evtType != null ? (byte)evtType.ordinal() : -1",
+        set = "eventTypeFromOrdinal($val$)"
+    )
     private EventType evtType;
 
     /** Key. */
     @GridToStringInclude
+    @GridCodegenConverter(get = "isFiltered() ? null : key")
     private KeyCacheObject key;
 
     /** New value. */
     @GridToStringInclude
+    @GridCodegenConverter(get = "isFiltered() ? null : newVal")
     private CacheObject newVal;
 
     /** Old value. */
     @GridToStringInclude
+    @GridCodegenConverter(get = "isFiltered() ? null : oldVal")
     private CacheObject oldVal;
 
     /** Cache name. */
@@ -187,9 +196,6 @@ public class CacheContinuousQueryEntry implements 
GridCacheDeployable, Message {
      */
     void markFiltered() {
         flags |= FILTERED_ENTRY;
-        newVal = null;
-        oldVal = null;
-        key = null;
         depInfo = null;
     }
 
@@ -345,19 +351,19 @@ public class CacheContinuousQueryEntry implements 
GridCacheDeployable, Message {
                 writer.incrementState();
 
             case 5:
-                if (!writer.writeMessage("key", key))
+                if (!writer.writeMessage("key", isFiltered() ? null : key))
                     return false;
 
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeMessage("newVal", newVal))
+                if (!writer.writeMessage("newVal", isFiltered() ? null : 
newVal))
                     return false;
 
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeMessage("oldVal", oldVal))
+                if (!writer.writeMessage("oldVal", isFiltered() ? null : 
oldVal))
                     return false;
 
                 writer.incrementState();
@@ -402,15 +408,11 @@ public class CacheContinuousQueryEntry implements 
GridCacheDeployable, Message {
                 reader.incrementState();
 
             case 1:
-                byte evtTypeOrd;
-
-                evtTypeOrd = reader.readByte("evtType");
+                evtType = eventTypeFromOrdinal(reader.readByte("evtType"));
 
                 if (!reader.isLastRead())
                     return false;
 
-                evtType = eventTypeFromOrdinal(evtTypeOrd);
-
                 reader.incrementState();
 
             case 2:

http://git-wip-us.apache.org/repos/asf/ignite/blob/046657ba/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
new file mode 100644
index 0000000..66d727c
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.event.EventType;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.direct.DirectMessageReader;
+import org.apache.ignite.internal.direct.DirectMessageWriter;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheContinuousQueryImmutableEntryTest extends 
GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** Keys count. */
+    private static final int KEYS_COUNT = 10;
+
+    /** Grid count. */
+    private static final int GRID_COUNT = 3;
+
+    /** Events. */
+    private static final ConcurrentLinkedQueue<CacheEntryEvent<?, ?>> events = 
new ConcurrentLinkedQueue<>();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setAtomicityMode(ATOMIC);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+
+        events.clear();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testEventAvailabilityScope() throws Exception {
+        startGrids(GRID_COUNT);
+
+        final CacheEventListener lsnr = new CacheEventListener();
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+        qry.setLocalListener(lsnr);
+        qry.setRemoteFilterFactory(new FilterFactory());
+
+        Object keys[] = new Object[GRID_COUNT];
+
+        // Add initial values.
+        for (int i = 0; i < GRID_COUNT; ++i) {
+            keys[i] = primaryKey(grid(i).cache(null));
+
+            grid(0).cache(null).put(keys[i], -1);
+        }
+
+        try (QueryCursor<?> cur = grid(0).cache(null).query(qry)) {
+            // Replace values on the keys.
+            for (int i = 0; i < KEYS_COUNT; i++) {
+                log.info("Put key: " + i);
+
+                grid(i % GRID_COUNT).cache(null).put(keys[i % GRID_COUNT], i);
+            }
+        }
+
+        assertTrue("There are not filtered events", !events.isEmpty());
+
+        for (CacheEntryEvent<?, ?> event : events) {
+            assertNotNull("Key is null", event.getKey());
+            assertNotNull("Value is null", event.getValue());
+            assertNotNull("Old value is null", event.getOldValue());
+        }
+    }
+
+    /**
+     *
+     */
+    public void testCacheContinuousQueryEntrySerialization() {
+        CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry(
+            1,
+            EventType.UPDATED,
+            new KeyCacheObjectImpl(1, new byte[] {0, 0, 0, 1}),
+            new CacheObjectImpl(2, new byte[] {0, 0, 0, 2}),
+            new CacheObjectImpl(2, new byte[] {0, 0, 0, 3}),
+            true,
+            1,
+            1L,
+            new AffinityTopologyVersion(1L));
+
+        e0.filteredEvents(new GridLongList(new long[]{1L, 2L}));
+        e0.markFiltered();
+
+        ByteBuffer buf = ByteBuffer.allocate(4096);
+        DirectMessageWriter writer = new DirectMessageWriter((byte)1);
+
+        // Skip write class header.
+        writer.onHeaderWritten();
+        e0.writeTo(buf, writer);
+
+        CacheContinuousQueryEntry e1 = new CacheContinuousQueryEntry();
+        e1.readFrom(ByteBuffer.wrap(buf.array()), new DirectMessageReader(new 
GridIoMessageFactory(null), (byte)1));
+
+        assertEquals(e0.cacheId(), e1.cacheId());
+        assertEquals(e0.eventType(), e1.eventType());
+        assertEquals(e0.isFiltered(), e1.isFiltered());
+        assertEquals(GridLongList.asList(e0.filteredEvents()), 
GridLongList.asList(e1.filteredEvents()));
+        assertEquals(e0.isBackup(), e1.isBackup());
+        assertEquals(e0.isKeepBinary(), e1.isKeepBinary());
+        assertEquals(e0.partition(), e1.partition());
+        assertEquals(e0.updateCounter(), e1.updateCounter());
+
+        // Key and value shouldn't be serialized in case an event is filtered.
+        assertNull(e1.key());
+        assertNotNull(e0.key());
+        assertNull(e1.oldValue());
+        assertNotNull(e0.oldValue());
+        assertNull(e1.value());
+        assertNotNull(e0.value());
+    }
+
+    /**
+     *
+     */
+    private static class FilterFactory implements 
Factory<CacheEntryEventFilter<Object, Object>> {
+        /** {@inheritDoc} */
+        @Override public CacheEntryEventFilter<Object, Object> create() {
+            return new CacheEventFilter();
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CacheEventFilter implements 
CacheEntryEventFilter<Object, Object>, Serializable {
+        /** {@inheritDoc} */
+         @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) {
+            events.add(evt);
+
+            return false;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CacheEventListener implements 
CacheEntryUpdatedListener<Object, Object> {
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+            // No-op
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/046657ba/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
index e0e81b7..dbbb3ed 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
@@ -54,6 +54,7 @@ import 
org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon
 import 
org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientReconnectTest;
 import 
org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest;
 import 
org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTxReconnectTest;
+import 
org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryImmutableEntryTest;
 
 /**
  * Test suite for cache queries.
@@ -103,6 +104,7 @@ public class IgniteCacheQuerySelfTestSuite3 extends 
TestSuite {
         suite.addTestSuite(CacheContinuousBatchForceServerModeAckTest.class);
         suite.addTestSuite(CacheContinuousQueryExecuteInPrimaryTest.class);
         suite.addTestSuite(CacheContinuousQueryLostPartitionTest.class);
+        suite.addTestSuite(IgniteCacheContinuousQueryImmutableEntryTest.class);
 
         return suite;
     }

Reply via email to