Repository: ignite Updated Branches: refs/heads/master 414cc58be -> 046657baf
IGNITE-2865 Continuous query event passed to filter should be immutable for users. This closes #744. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/046657ba Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/046657ba Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/046657ba Branch: refs/heads/master Commit: 046657baf65a9bf3dfdf29a66145f9ad2c5b3698 Parents: 414cc58 Author: sboikov <[email protected]> Authored: Tue May 24 14:31:43 2016 +0300 Committer: sboikov <[email protected]> Committed: Tue May 24 14:31:43 2016 +0300 ---------------------------------------------------------------------- .../ignite/codegen/MessageCodeGenerator.java | 133 ++++++------ .../ignite/internal/GridCodegenConverter.java | 56 +++++ .../continuous/CacheContinuousQueryEntry.java | 24 ++- ...eCacheContinuousQueryImmutableEntryTest.java | 205 +++++++++++++++++++ .../IgniteCacheQuerySelfTestSuite3.java | 2 + 5 files changed, 351 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/046657ba/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java ---------------------------------------------------------------------- diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java index 587ad06..a6ae0da 100644 --- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java +++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java @@ -39,6 +39,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.UUID; +import org.apache.ignite.internal.GridCodegenConverter; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; @@ -544,8 +545,14 @@ public class MessageCodeGenerator { indent++; - returnFalseIfWriteFailed(field.getType(), field.getName(), colAnn != null ? colAnn.value() : null, - mapAnn != null ? mapAnn.keyType() : null, mapAnn != null ? mapAnn.valueType() : null, false); + GridCodegenConverter fldPreproc = field.getAnnotation(GridCodegenConverter.class); + + String getExp = (fldPreproc != null && !fldPreproc.get().isEmpty())? fldPreproc.get(): field.getName(); + Class<?> writeType = (fldPreproc != null && !fldPreproc.type().equals(GridCodegenConverter.Default.class))? + fldPreproc.type(): field.getType(); + + returnFalseIfWriteFailed(writeType, field.getName(), colAnn != null ? colAnn.value() : null, + mapAnn != null ? mapAnn.keyType() : null, mapAnn != null ? mapAnn.valueType() : null, false, getExp); write.add(EMPTY); write.add(builder().a("writer.incrementState();").toString()); @@ -569,8 +576,13 @@ public class MessageCodeGenerator { indent++; - returnFalseIfReadFailed(field.getType(), field.getName(), colAnn != null ? colAnn.value() : null, - mapAnn != null ? mapAnn.keyType() : null, mapAnn != null ? mapAnn.valueType() : null); + GridCodegenConverter fldPreproc = field.getAnnotation(GridCodegenConverter.class); + String setExp = (fldPreproc != null && !fldPreproc.get().isEmpty())? fldPreproc.set(): ""; + Class<?> writeType = (fldPreproc != null && !fldPreproc.type().equals(GridCodegenConverter.Default.class))? + fldPreproc.type(): field.getType(); + + returnFalseIfReadFailed(writeType, field.getName(), colAnn != null ? colAnn.value() : null, + mapAnn != null ? mapAnn.keyType() : null, mapAnn != null ? mapAnn.valueType() : null, setExp); read.add(EMPTY); read.add(builder().a("reader.incrementState();").toString()); @@ -588,74 +600,74 @@ public class MessageCodeGenerator { * @param raw Raw write flag. */ private void returnFalseIfWriteFailed(Class<?> type, String name, @Nullable Class<?> colItemType, - @Nullable Class<?> mapKeyType, @Nullable Class<?> mapValType, boolean raw) { + @Nullable Class<?> mapKeyType, @Nullable Class<?> mapValType, boolean raw, String getExpr) { assert type != null; assert name != null; String field = raw ? "null" : '"' + name + '"'; if (type == byte.class) - returnFalseIfFailed(write, "writer.writeByte", field, name); + returnFalseIfFailed(write, "writer.writeByte", field, getExpr); else if (type == short.class) - returnFalseIfFailed(write, "writer.writeShort", field, name); + returnFalseIfFailed(write, "writer.writeShort", field, getExpr); else if (type == int.class) - returnFalseIfFailed(write, "writer.writeInt", field, name); + returnFalseIfFailed(write, "writer.writeInt", field, getExpr); else if (type == long.class) - returnFalseIfFailed(write, "writer.writeLong", field, name); + returnFalseIfFailed(write, "writer.writeLong", field, getExpr); else if (type == float.class) - returnFalseIfFailed(write, "writer.writeFloat", field, name); + returnFalseIfFailed(write, "writer.writeFloat", field, getExpr); else if (type == double.class) - returnFalseIfFailed(write, "writer.writeDouble", field, name); + returnFalseIfFailed(write, "writer.writeDouble", field, getExpr); else if (type == char.class) - returnFalseIfFailed(write, "writer.writeChar", field, name); + returnFalseIfFailed(write, "writer.writeChar", field, getExpr); else if (type == boolean.class) - returnFalseIfFailed(write, "writer.writeBoolean", field, name); + returnFalseIfFailed(write, "writer.writeBoolean", field, getExpr); else if (type == byte[].class) - returnFalseIfFailed(write, "writer.writeByteArray", field, name); + returnFalseIfFailed(write, "writer.writeByteArray", field, getExpr); else if (type == short[].class) - returnFalseIfFailed(write, "writer.writeShortArray", field, name); + returnFalseIfFailed(write, "writer.writeShortArray", field, getExpr); else if (type == int[].class) - returnFalseIfFailed(write, "writer.writeIntArray", field, name); + returnFalseIfFailed(write, "writer.writeIntArray", field, getExpr); else if (type == long[].class) - returnFalseIfFailed(write, "writer.writeLongArray", field, name); + returnFalseIfFailed(write, "writer.writeLongArray", field, getExpr); else if (type == float[].class) - returnFalseIfFailed(write, "writer.writeFloatArray", field, name); + returnFalseIfFailed(write, "writer.writeFloatArray", field, getExpr); else if (type == double[].class) - returnFalseIfFailed(write, "writer.writeDoubleArray", field, name); + returnFalseIfFailed(write, "writer.writeDoubleArray", field, getExpr); else if (type == char[].class) - returnFalseIfFailed(write, "writer.writeCharArray", field, name); + returnFalseIfFailed(write, "writer.writeCharArray", field, getExpr); else if (type == boolean[].class) - returnFalseIfFailed(write, "writer.writeBooleanArray", field, name); + returnFalseIfFailed(write, "writer.writeBooleanArray", field, getExpr); else if (type == String.class) - returnFalseIfFailed(write, "writer.writeString", field, name); + returnFalseIfFailed(write, "writer.writeString", field, getExpr); else if (type == BitSet.class) - returnFalseIfFailed(write, "writer.writeBitSet", field, name); + returnFalseIfFailed(write, "writer.writeBitSet", field, getExpr); else if (type == UUID.class) - returnFalseIfFailed(write, "writer.writeUuid", field, name); + returnFalseIfFailed(write, "writer.writeUuid", field, getExpr); else if (type == IgniteUuid.class) - returnFalseIfFailed(write, "writer.writeIgniteUuid", field, name); + returnFalseIfFailed(write, "writer.writeIgniteUuid", field, getExpr); else if (type.isEnum()) { - String arg = name + " != null ? (byte)" + name + ".ordinal() : -1"; + String arg = getExpr + " != null ? (byte)" + getExpr + ".ordinal() : -1"; returnFalseIfFailed(write, "writer.writeByte", field, arg); } else if (BASE_CLS.isAssignableFrom(type)) - returnFalseIfFailed(write, "writer.writeMessage", field, name); + returnFalseIfFailed(write, "writer.writeMessage", field, getExpr); else if (type.isArray()) { - returnFalseIfFailed(write, "writer.writeObjectArray", field, name, + returnFalseIfFailed(write, "writer.writeObjectArray", field, getExpr, "MessageCollectionItemType." + typeEnum(type.getComponentType())); } else if (Collection.class.isAssignableFrom(type) && !Set.class.isAssignableFrom(type)) { assert colItemType != null; - returnFalseIfFailed(write, "writer.writeCollection", field, name, + returnFalseIfFailed(write, "writer.writeCollection", field, getExpr, "MessageCollectionItemType." + typeEnum(colItemType)); } else if (Map.class.isAssignableFrom(type)) { assert mapKeyType != null; assert mapValType != null; - returnFalseIfFailed(write, "writer.writeMap", field, name, + returnFalseIfFailed(write, "writer.writeMap", field, getExpr, "MessageCollectionItemType." + typeEnum(mapKeyType), "MessageCollectionItemType." + typeEnum(mapValType)); } @@ -671,75 +683,75 @@ public class MessageCodeGenerator { * @param mapValType Map value type. */ private void returnFalseIfReadFailed(Class<?> type, @Nullable String name, @Nullable Class<?> colItemType, - @Nullable Class<?> mapKeyType, @Nullable Class<?> mapValType) { + @Nullable Class<?> mapKeyType, @Nullable Class<?> mapValType, String setExpr) { assert type != null; String field = '"' + name + '"'; if (type == byte.class) - returnFalseIfReadFailed(name, "reader.readByte", field); + returnFalseIfReadFailed(name, "reader.readByte", setExpr, field); else if (type == short.class) - returnFalseIfReadFailed(name, "reader.readShort", field); + returnFalseIfReadFailed(name, "reader.readShort", setExpr, field); else if (type == int.class) - returnFalseIfReadFailed(name, "reader.readInt", field); + returnFalseIfReadFailed(name, "reader.readInt", setExpr, field); else if (type == long.class) - returnFalseIfReadFailed(name, "reader.readLong", field); + returnFalseIfReadFailed(name, "reader.readLong", setExpr, field); else if (type == float.class) - returnFalseIfReadFailed(name, "reader.readFloat", field); + returnFalseIfReadFailed(name, "reader.readFloat", setExpr, field); else if (type == double.class) - returnFalseIfReadFailed(name, "reader.readDouble", field); + returnFalseIfReadFailed(name, "reader.readDouble", setExpr, field); else if (type == char.class) - returnFalseIfReadFailed(name, "reader.readChar", field); + returnFalseIfReadFailed(name, "reader.readChar", setExpr, field); else if (type == boolean.class) - returnFalseIfReadFailed(name, "reader.readBoolean", field); + returnFalseIfReadFailed(name, "reader.readBoolean", setExpr, field); else if (type == byte[].class) - returnFalseIfReadFailed(name, "reader.readByteArray", field); + returnFalseIfReadFailed(name, "reader.readByteArray", setExpr, field); else if (type == short[].class) - returnFalseIfReadFailed(name, "reader.readShortArray", field); + returnFalseIfReadFailed(name, "reader.readShortArray", setExpr, field); else if (type == int[].class) - returnFalseIfReadFailed(name, "reader.readIntArray", field); + returnFalseIfReadFailed(name, "reader.readIntArray", setExpr, field); else if (type == long[].class) - returnFalseIfReadFailed(name, "reader.readLongArray", field); + returnFalseIfReadFailed(name, "reader.readLongArray", setExpr, field); else if (type == float[].class) - returnFalseIfReadFailed(name, "reader.readFloatArray", field); + returnFalseIfReadFailed(name, "reader.readFloatArray", setExpr, field); else if (type == double[].class) - returnFalseIfReadFailed(name, "reader.readDoubleArray", field); + returnFalseIfReadFailed(name, "reader.readDoubleArray", setExpr, field); else if (type == char[].class) - returnFalseIfReadFailed(name, "reader.readCharArray", field); + returnFalseIfReadFailed(name, "reader.readCharArray", setExpr, field); else if (type == boolean[].class) - returnFalseIfReadFailed(name, "reader.readBooleanArray", field); + returnFalseIfReadFailed(name, "reader.readBooleanArray", setExpr, field); else if (type == String.class) - returnFalseIfReadFailed(name, "reader.readString", field); + returnFalseIfReadFailed(name, "reader.readString", setExpr, field); else if (type == BitSet.class) - returnFalseIfReadFailed(name, "reader.readBitSet", field); + returnFalseIfReadFailed(name, "reader.readBitSet", setExpr, field); else if (type == UUID.class) - returnFalseIfReadFailed(name, "reader.readUuid", field); + returnFalseIfReadFailed(name, "reader.readUuid", setExpr, field); else if (type == IgniteUuid.class) - returnFalseIfReadFailed(name, "reader.readIgniteUuid", field); + returnFalseIfReadFailed(name, "reader.readIgniteUuid", setExpr, field); else if (type.isEnum()) { String loc = name + "Ord"; read.add(builder().a("byte ").a(loc).a(";").toString()); read.add(EMPTY); - returnFalseIfReadFailed(loc, "reader.readByte", field); + returnFalseIfReadFailed(loc, "reader.readByte", setExpr, field); read.add(EMPTY); read.add(builder().a(name).a(" = ").a(type.getSimpleName()).a(".fromOrdinal(").a(loc).a(");").toString()); } else if (BASE_CLS.isAssignableFrom(type)) - returnFalseIfReadFailed(name, "reader.readMessage", field); + returnFalseIfReadFailed(name, "reader.readMessage", setExpr, field); else if (type.isArray()) { Class<?> compType = type.getComponentType(); - returnFalseIfReadFailed(name, "reader.readObjectArray", field, + returnFalseIfReadFailed(name, "reader.readObjectArray", field, setExpr, "MessageCollectionItemType." + typeEnum(compType), compType.getSimpleName() + ".class"); } else if (Collection.class.isAssignableFrom(type) && !Set.class.isAssignableFrom(type)) { assert colItemType != null; - returnFalseIfReadFailed(name, "reader.readCollection", field, + returnFalseIfReadFailed(name, "reader.readCollection", field, setExpr, "MessageCollectionItemType." + typeEnum(colItemType)); } else if (Map.class.isAssignableFrom(type)) { @@ -748,7 +760,7 @@ public class MessageCodeGenerator { boolean linked = type.equals(LinkedHashMap.class); - returnFalseIfReadFailed(name, "reader.readMap", field, + returnFalseIfReadFailed(name, "reader.readMap", field, setExpr, "MessageCollectionItemType." + typeEnum(mapKeyType), "MessageCollectionItemType." + typeEnum(mapValType), linked ? "true" : "false"); @@ -762,7 +774,7 @@ public class MessageCodeGenerator { * @param mtd Method name. * @param args Method arguments. */ - private void returnFalseIfReadFailed(String var, String mtd, @Nullable String... args) { + private void returnFalseIfReadFailed(String var, String mtd, String setConverter, @Nullable String... args) { assert mtd != null; String argsStr = ""; @@ -774,7 +786,12 @@ public class MessageCodeGenerator { argsStr = argsStr.substring(0, argsStr.length() - 2); } - read.add(builder().a(var).a(" = ").a(mtd).a("(").a(argsStr).a(");").toString()); + if (setConverter.isEmpty()) + read.add(builder().a(var).a(" = ").a(mtd).a("(").a(argsStr).a(");").toString()); + else { + read.add(builder().a(var).a(" = ").a(setConverter + .replace("$val$", new SB().a(mtd).a("(").a(argsStr).a(")").toString())).a(";").toString()); + } read.add(EMPTY); read.add(builder().a("if (!reader.isLastRead())").toString()); http://git-wip-us.apache.org/repos/asf/ignite/blob/046657ba/modules/core/src/main/java/org/apache/ignite/internal/GridCodegenConverter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridCodegenConverter.java b/modules/core/src/main/java/org/apache/ignite/internal/GridCodegenConverter.java new file mode 100644 index 0000000..7baccda --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridCodegenConverter.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotates fields that required custom reading / writing. + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.FIELD) +public @interface GridCodegenConverter { + /** + * Converter code. Replace original field value by converted. + * + * @return Code to get field value. + */ + String get() default ""; + + /** + * Converter code. A raw read value is available at $val$ placeholder + * + * @return Code to set field value. + */ + String set() default ""; + + /** + * @return Result type. Use this type to read / write the converted value. + */ + Class<?> type() default Default.class; + + /** + * Used as the default value for converter type + */ + class Default { + // No-op. + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/046657ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java index d105271..63dc4cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.query.continuous; import java.nio.ByteBuffer; import javax.cache.event.EventType; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridCodegenConverter; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -61,18 +62,26 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { } /** */ + @GridCodegenConverter( + type = byte.class, + get = "evtType != null ? (byte)evtType.ordinal() : -1", + set = "eventTypeFromOrdinal($val$)" + ) private EventType evtType; /** Key. */ @GridToStringInclude + @GridCodegenConverter(get = "isFiltered() ? null : key") private KeyCacheObject key; /** New value. */ @GridToStringInclude + @GridCodegenConverter(get = "isFiltered() ? null : newVal") private CacheObject newVal; /** Old value. */ @GridToStringInclude + @GridCodegenConverter(get = "isFiltered() ? null : oldVal") private CacheObject oldVal; /** Cache name. */ @@ -187,9 +196,6 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { */ void markFiltered() { flags |= FILTERED_ENTRY; - newVal = null; - oldVal = null; - key = null; depInfo = null; } @@ -345,19 +351,19 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { writer.incrementState(); case 5: - if (!writer.writeMessage("key", key)) + if (!writer.writeMessage("key", isFiltered() ? null : key)) return false; writer.incrementState(); case 6: - if (!writer.writeMessage("newVal", newVal)) + if (!writer.writeMessage("newVal", isFiltered() ? null : newVal)) return false; writer.incrementState(); case 7: - if (!writer.writeMessage("oldVal", oldVal)) + if (!writer.writeMessage("oldVal", isFiltered() ? null : oldVal)) return false; writer.incrementState(); @@ -402,15 +408,11 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); case 1: - byte evtTypeOrd; - - evtTypeOrd = reader.readByte("evtType"); + evtType = eventTypeFromOrdinal(reader.readByte("evtType")); if (!reader.isLastRead()) return false; - evtType = eventTypeFromOrdinal(evtTypeOrd); - reader.incrementState(); case 2: http://git-wip-us.apache.org/repos/asf/ignite/blob/046657ba/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java new file mode 100644 index 0000000..66d727c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.concurrent.ConcurrentLinkedQueue; +import javax.cache.configuration.Factory; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryEventFilter; +import javax.cache.event.CacheEntryUpdatedListener; +import javax.cache.event.EventType; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.direct.DirectMessageReader; +import org.apache.ignite.internal.direct.DirectMessageWriter; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheObjectImpl; +import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; +import org.apache.ignite.internal.util.GridLongList; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteCacheContinuousQueryImmutableEntryTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Keys count. */ + private static final int KEYS_COUNT = 10; + + /** Grid count. */ + private static final int GRID_COUNT = 3; + + /** Events. */ + private static final ConcurrentLinkedQueue<CacheEntryEvent<?, ?>> events = new ConcurrentLinkedQueue<>(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setCacheMode(PARTITIONED); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + cfg.setCacheConfiguration(ccfg); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + + events.clear(); + } + + /** + * @throws Exception If failed. + */ + public void testEventAvailabilityScope() throws Exception { + startGrids(GRID_COUNT); + + final CacheEventListener lsnr = new CacheEventListener(); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + qry.setLocalListener(lsnr); + qry.setRemoteFilterFactory(new FilterFactory()); + + Object keys[] = new Object[GRID_COUNT]; + + // Add initial values. + for (int i = 0; i < GRID_COUNT; ++i) { + keys[i] = primaryKey(grid(i).cache(null)); + + grid(0).cache(null).put(keys[i], -1); + } + + try (QueryCursor<?> cur = grid(0).cache(null).query(qry)) { + // Replace values on the keys. + for (int i = 0; i < KEYS_COUNT; i++) { + log.info("Put key: " + i); + + grid(i % GRID_COUNT).cache(null).put(keys[i % GRID_COUNT], i); + } + } + + assertTrue("There are not filtered events", !events.isEmpty()); + + for (CacheEntryEvent<?, ?> event : events) { + assertNotNull("Key is null", event.getKey()); + assertNotNull("Value is null", event.getValue()); + assertNotNull("Old value is null", event.getOldValue()); + } + } + + /** + * + */ + public void testCacheContinuousQueryEntrySerialization() { + CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry( + 1, + EventType.UPDATED, + new KeyCacheObjectImpl(1, new byte[] {0, 0, 0, 1}), + new CacheObjectImpl(2, new byte[] {0, 0, 0, 2}), + new CacheObjectImpl(2, new byte[] {0, 0, 0, 3}), + true, + 1, + 1L, + new AffinityTopologyVersion(1L)); + + e0.filteredEvents(new GridLongList(new long[]{1L, 2L})); + e0.markFiltered(); + + ByteBuffer buf = ByteBuffer.allocate(4096); + DirectMessageWriter writer = new DirectMessageWriter((byte)1); + + // Skip write class header. + writer.onHeaderWritten(); + e0.writeTo(buf, writer); + + CacheContinuousQueryEntry e1 = new CacheContinuousQueryEntry(); + e1.readFrom(ByteBuffer.wrap(buf.array()), new DirectMessageReader(new GridIoMessageFactory(null), (byte)1)); + + assertEquals(e0.cacheId(), e1.cacheId()); + assertEquals(e0.eventType(), e1.eventType()); + assertEquals(e0.isFiltered(), e1.isFiltered()); + assertEquals(GridLongList.asList(e0.filteredEvents()), GridLongList.asList(e1.filteredEvents())); + assertEquals(e0.isBackup(), e1.isBackup()); + assertEquals(e0.isKeepBinary(), e1.isKeepBinary()); + assertEquals(e0.partition(), e1.partition()); + assertEquals(e0.updateCounter(), e1.updateCounter()); + + // Key and value shouldn't be serialized in case an event is filtered. + assertNull(e1.key()); + assertNotNull(e0.key()); + assertNull(e1.oldValue()); + assertNotNull(e0.oldValue()); + assertNull(e1.value()); + assertNotNull(e0.value()); + } + + /** + * + */ + private static class FilterFactory implements Factory<CacheEntryEventFilter<Object, Object>> { + /** {@inheritDoc} */ + @Override public CacheEntryEventFilter<Object, Object> create() { + return new CacheEventFilter(); + } + } + + /** + * + */ + private static class CacheEventFilter implements CacheEntryEventFilter<Object, Object>, Serializable { + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) { + events.add(evt); + + return false; + } + } + + /** + * + */ + private static class CacheEventListener implements CacheEntryUpdatedListener<Object, Object> { + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { + // No-op + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/046657ba/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java index e0e81b7..dbbb3ed 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java @@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientReconnectTest; import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest; import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTxReconnectTest; +import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryImmutableEntryTest; /** * Test suite for cache queries. @@ -103,6 +104,7 @@ public class IgniteCacheQuerySelfTestSuite3 extends TestSuite { suite.addTestSuite(CacheContinuousBatchForceServerModeAckTest.class); suite.addTestSuite(CacheContinuousQueryExecuteInPrimaryTest.class); suite.addTestSuite(CacheContinuousQueryLostPartitionTest.class); + suite.addTestSuite(IgniteCacheContinuousQueryImmutableEntryTest.class); return suite; }
