This is an automated email from the ASF dual-hosted git repository.
shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 99231cfe1f7 IGNITE-27845 Use MessageSerializer for
CacheContinuousQueryEntry (#12744)
99231cfe1f7 is described below
commit 99231cfe1f76ce8e71e04c9b82fa7688c39bcde0
Author: Alexey Abashev <[email protected]>
AuthorDate: Wed Mar 4 16:36:55 2026 +0300
IGNITE-27845 Use MessageSerializer for CacheContinuousQueryEntry (#12744)
---
.../ignite/internal/GridCodegenConverter.java | 56 -----
.../communication/GridIoMessageFactory.java | 3 +-
.../continuous/CacheContinuousQueryEntry.java | 255 +++++----------------
...niteCacheContinuousQueryImmutableEntryTest.java | 14 +-
4 files changed, 66 insertions(+), 262 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/GridCodegenConverter.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridCodegenConverter.java
deleted file mode 100644
index 8b68e0bcb00..00000000000
---
a/modules/core/src/main/java/org/apache/ignite/internal/GridCodegenConverter.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- * Annotates fields that required custom reading / writing.
- */
-@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.FIELD)
-public @interface GridCodegenConverter {
- /**
- * Converter code. Replace original field value by converted.
- *
- * @return Code to get field value.
- */
- String get() default "";
-
- /**
- * Converter code. A raw read value is available at $val$ placeholder
- *
- * @return Code to set field value.
- */
- String set() default "";
-
- /**
- * @return Result type. Use this type to read / write the converted value.
- */
- Class<?> type() default Default.class;
-
- /**
- * Used as the default value for converter type
- */
- class Default {
- // No-op.
- }
-}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 0e4e507b524..400621eb161 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -245,6 +245,7 @@ import
org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuerySerial
import
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryBatchAck;
import
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryBatchAckSerializer;
import
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEntry;
+import
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEntrySerializer;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import
org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntrySerializer;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -453,7 +454,7 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)93, CacheInvokeDirectResult::new, new
CacheInvokeDirectResultSerializer());
factory.register((short)94, IgniteTxKey::new, new
IgniteTxKeySerializer());
factory.register((short)95, DataStreamerEntry::new);
- factory.register((short)96, CacheContinuousQueryEntry::new);
+ factory.register((short)96, CacheContinuousQueryEntry::new, new
CacheContinuousQueryEntrySerializer());
factory.register((short)97, CacheEvictionEntry::new, new
CacheEvictionEntrySerializer());
factory.register((short)98, CacheEntryPredicateAdapter::new, new
CacheEntryPredicateAdapterSerializer());
factory.register((short)100, IgniteTxEntry::new, new
IgniteTxEntrySerializer());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index a46c4a1f83d..3b7e40f84f5 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -17,11 +17,9 @@
package org.apache.ignite.internal.processors.cache.query.continuous;
-import java.nio.ByteBuffer;
import javax.cache.event.EventType;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridCodegenConverter;
-import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
@@ -32,8 +30,6 @@ import
org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
/**
@@ -50,62 +46,52 @@ public class CacheContinuousQueryEntry implements
GridCacheDeployable, Message {
private static final byte KEEP_BINARY = 0b0100;
/** */
- private static final EventType[] EVT_TYPE_VALS = EventType.values();
+ @Order(0)
+ EventType evtType;
- /**
- * @param ord Event type ordinal value.
- * @return Event type.
- */
- @Nullable public static EventType eventTypeFromOrdinal(int ord) {
- return ord >= 0 && ord < EVT_TYPE_VALS.length ? EVT_TYPE_VALS[ord] :
null;
- }
-
- /** */
- @GridCodegenConverter(
- type = byte.class,
- get = "evtType != null ? (byte)evtType.ordinal() : -1",
- set = "eventTypeFromOrdinal($val$)"
- )
- private EventType evtType;
+ /** Flags. */
+ @Order(1)
+ byte flags;
/** Key. */
@GridToStringInclude
- @GridCodegenConverter(get = "isFiltered() ? null : key")
- private KeyCacheObject key;
+ @Order(value = 2, method = "serializedKey")
+ KeyCacheObject key;
/** New value. */
@GridToStringInclude
- @GridCodegenConverter(get = "isFiltered() ? null : newVal")
- private CacheObject newVal;
+ @Order(value = 3, method = "serializedNewValue")
+ CacheObject newVal;
/** Old value. */
@GridToStringInclude
- @GridCodegenConverter(get = "isFiltered() ? null : oldVal")
- private CacheObject oldVal;
+ @Order(value = 4, method = "serializedOldValue")
+ CacheObject oldVal;
/** Cache name. */
- private int cacheId;
+ @Order(5)
+ int cacheId;
/** Deployment info. */
@GridToStringExclude
- @GridDirectTransient
private GridDeploymentInfo depInfo;
/** Partition. */
- private int part;
+ @Order(6)
+ int part;
/** Update counter. */
- private long updateCntr;
-
- /** Flags. */
- private byte flags;
+ @Order(7)
+ long updateCntr;
/** */
@GridToStringInclude
- private AffinityTopologyVersion topVer;
+ @Order(8)
+ AffinityTopologyVersion topVer;
/** */
- private long filteredCnt;
+ @Order(9)
+ long filteredCnt;
/**
* Empty constructor.
@@ -339,6 +325,16 @@ public class CacheContinuousQueryEntry implements
GridCacheDeployable, Message {
return key;
}
+ /** */
+ KeyCacheObject serializedKey() {
+ return isFiltered() ? null : key;
+ }
+
+ /** */
+ void serializedKey(KeyCacheObject key) {
+ this.key = key;
+ }
+
/**
* @return New value.
*/
@@ -346,6 +342,16 @@ public class CacheContinuousQueryEntry implements
GridCacheDeployable, Message {
return newVal;
}
+ /** */
+ CacheObject serializedNewValue() {
+ return isFiltered() ? null : newVal;
+ }
+
+ /** */
+ void serializedNewValue(CacheObject newVal) {
+ this.newVal = newVal;
+ }
+
/**
* @return Old value.
*/
@@ -353,6 +359,16 @@ public class CacheContinuousQueryEntry implements
GridCacheDeployable, Message {
return oldVal;
}
+ /** */
+ CacheObject serializedOldValue() {
+ return isFiltered() ? null : oldVal;
+ }
+
+ /** */
+ void serializedOldValue(CacheObject oldVal) {
+ this.oldVal = oldVal;
+ }
+
/** {@inheritDoc} */
@Override public void prepare(GridDeploymentInfo depInfo) {
this.depInfo = depInfo;
@@ -368,173 +384,6 @@ public class CacheContinuousQueryEntry implements
GridCacheDeployable, Message {
return 96;
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeInt(cacheId))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeByte(evtType != null ?
(byte)evtType.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
- case 2:
- if (!writer.writeLong(filteredCnt))
- return false;
-
- writer.incrementState();
-
- case 3:
- if (!writer.writeByte(flags))
- return false;
-
- writer.incrementState();
-
- case 4:
- if (!writer.writeKeyCacheObject(isFiltered() ? null : key))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeCacheObject(isFiltered() ? null : newVal))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeCacheObject(isFiltered() ? null : oldVal))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeInt(part))
- return false;
-
- writer.incrementState();
-
- case 8:
- if (!writer.writeAffinityTopologyVersion(topVer))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeLong(updateCntr))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- switch (reader.state()) {
- case 0:
- cacheId = reader.readInt();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
- evtType = eventTypeFromOrdinal(reader.readByte());
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 2:
- filteredCnt = reader.readLong();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 3:
- flags = reader.readByte();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 4:
- key = reader.readKeyCacheObject();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- newVal = reader.readCacheObject();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
- oldVal = reader.readCacheObject();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
- part = reader.readInt();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 8:
- topVer = reader.readAffinityTopologyVersion();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- updateCntr = reader.readLong();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheContinuousQueryEntry.class, this);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
index 777bea67a89..7dc74b73de6 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
@@ -153,12 +153,22 @@ public class IgniteCacheContinuousQueryImmutableEntryTest
extends GridCommonAbst
ByteBuffer buf = ByteBuffer.allocate(4096);
DirectMessageWriter writer = new DirectMessageWriter(msgFactory);
+ var serializer = msgFactory.serializer(e0.directType());
+ assertNotNull("Serializer not found for message type " +
e0.directType(), serializer);
+
+ writer.setBuffer(buf);
+
// Skip write class header.
writer.onHeaderWritten();
- e0.writeTo(buf, writer);
+ serializer.writeTo(e0, writer);
CacheContinuousQueryEntry e1 = new CacheContinuousQueryEntry();
- e1.readFrom(ByteBuffer.wrap(buf.array()), new
DirectMessageReader(msgFactory, null));
+
+ final DirectMessageReader reader = new DirectMessageReader(msgFactory,
null);
+
+ reader.setBuffer(ByteBuffer.wrap(buf.array()));
+
+ serializer.readFrom(e1, reader);
assertEquals(e0.cacheId(), e1.cacheId());
assertEquals(e0.eventType(), e1.eventType());