This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 02bf42f5fbb IGNITE-26612 Use MessageSerializer for
MetadataResponseMessage (#12425)
02bf42f5fbb is described below
commit 02bf42f5fbb869f85ddda5a080ff0e8b38e8aabf
Author: Dmitry Werner <[email protected]>
AuthorDate: Thu Oct 23 13:19:58 2025 +0500
IGNITE-26612 Use MessageSerializer for MetadataResponseMessage (#12425)
---
.../communication/GridIoMessageFactory.java | 7 +-
.../cache/binary/BinaryMetadataTransport.java | 18 ++-
.../cache/binary/BinaryMetadataVersionInfo.java | 95 +++++++++++--
.../cache/binary/MetadataResponseMessage.java | 149 +++------------------
.../main/resources/META-INF/classnames.properties | 1 -
5 files changed, 117 insertions(+), 153 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 1531303a1d5..37f09d2aaff 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.GridTaskCancelRequest;
import org.apache.ignite.internal.GridTaskSessionRequest;
import org.apache.ignite.internal.IgniteDiagnosticMessage;
import
org.apache.ignite.internal.codegen.AtomicApplicationAttributesAwareRequestSerializer;
+import org.apache.ignite.internal.codegen.BinaryMetadataVersionInfoSerializer;
import
org.apache.ignite.internal.codegen.CacheContinuousQueryBatchAckSerializer;
import org.apache.ignite.internal.codegen.CacheEntryInfoCollectionSerializer;
import org.apache.ignite.internal.codegen.CacheEvictionEntrySerializer;
@@ -98,6 +99,7 @@ import
org.apache.ignite.internal.codegen.IncrementalSnapshotAwareMessageSeriali
import org.apache.ignite.internal.codegen.JobStealingRequestSerializer;
import org.apache.ignite.internal.codegen.LatchAckMessageSerializer;
import org.apache.ignite.internal.codegen.MetadataRequestMessageSerializer;
+import org.apache.ignite.internal.codegen.MetadataResponseMessageSerializer;
import
org.apache.ignite.internal.codegen.MissingMappingRequestMessageSerializer;
import
org.apache.ignite.internal.codegen.MissingMappingResponseMessageSerializer;
import org.apache.ignite.internal.codegen.NearCacheUpdatesSerializer;
@@ -139,6 +141,7 @@ import
org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import
org.apache.ignite.internal.processors.cache.GridChangeGlobalStateMessageResponse;
import org.apache.ignite.internal.processors.cache.WalStateAckMessage;
+import
org.apache.ignite.internal.processors.cache.binary.BinaryMetadataVersionInfo;
import
org.apache.ignite.internal.processors.cache.binary.MetadataRequestMessage;
import
org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage;
import
org.apache.ignite.internal.processors.cache.distributed.GridCacheTtlUpdateRequest;
@@ -347,7 +350,7 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)78, MissingMappingRequestMessage::new, new
MissingMappingRequestMessageSerializer());
factory.register((short)79, MissingMappingResponseMessage::new, new
MissingMappingResponseMessageSerializer());
factory.register((short)80, MetadataRequestMessage::new, new
MetadataRequestMessageSerializer());
- factory.register((short)81, MetadataResponseMessage::new);
+ factory.register((short)81, MetadataResponseMessage::new, new
MetadataResponseMessageSerializer());
factory.register((short)82, JobStealingRequest::new, new
JobStealingRequestSerializer());
factory.register((short)84, GridByteArrayList::new);
factory.register((short)86, GridCacheVersion::new, new
GridCacheVersionSerializer());
@@ -432,6 +435,8 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register(CacheWriteSynchronizationModeMessage.TYPE_CODE,
CacheWriteSynchronizationModeMessage::new,
new CacheWriteSynchronizationModeMessageSerializer());
factory.register(GridCacheOperationMessage.TYPE_CODE,
GridCacheOperationMessage::new, new GridCacheOperationMessageSerializer());
+ factory.register(BinaryMetadataVersionInfo.TYPE_CODE,
BinaryMetadataVersionInfo::new,
+ new BinaryMetadataVersionInfoSerializer());
// [-3..119] [124..129] [-23..-28] [-36..-55] [183..188] - this
// [120..123] - DR
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
index 6b1582c1374..908fdc41f2b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
@@ -842,20 +842,16 @@ final class BinaryMetadataTransport {
MetadataResponseMessage resp = new MetadataResponseMessage(typeId);
- byte[] binMetaBytes = null;
-
if (metaVerInfo != null) {
try {
- binMetaBytes = U.marshal(ctx, metaVerInfo);
+ metaVerInfo.marshalMetadata();
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to marshal binary metadata for
[typeId=" + typeId + ']', e);
-
- resp.markErrorOnRequest();
}
}
- resp.binaryMetadataBytes(binMetaBytes);
+ resp.metadataVersionInfo(metaVerInfo);
try {
ioMgr.sendToGridTopic(nodeId, GridTopic.TOPIC_METADATA_REQ,
resp, SYSTEM_POOL);
@@ -882,21 +878,23 @@ final class BinaryMetadataTransport {
int typeId = msg0.typeId();
- byte[] binMetaBytes = msg0.binaryMetadataBytes();
-
ClientMetadataRequestFuture fut = clientReqSyncMap.get(typeId);
if (fut == null)
return;
- if (msg0.metadataNotFound()) {
+ BinaryMetadataVersionInfo metaVerInfo = msg0.metadataVersionInfo();
+
+ if (metaVerInfo == null) {
fut.onDone(MetadataUpdateResult.createSuccessfulResult(-1));
return;
}
try {
- casBinaryMetadata(typeId, U.unmarshal(ctx, binMetaBytes,
U.resolveClassLoader(ctx.config())));
+ metaVerInfo.unmarshalMetadata();
+
+ casBinaryMetadata(typeId, metaVerInfo);
fut.onDone(MetadataUpdateResult.createSuccessfulResult(-1));
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataVersionInfo.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataVersionInfo.java
index 36c8f98418e..88aff49bdd4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataVersionInfo.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataVersionInfo.java
@@ -17,7 +17,13 @@
package org.apache.ignite.internal.processors.cache.binary;
import java.io.Serializable;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.binary.BinaryMetadata;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+import static org.apache.ignite.marshaller.Marshallers.jdk;
/**
* Wrapper for {@link BinaryMetadata} which is stored in metadata local cache
on each node.
@@ -25,28 +31,42 @@ import org.apache.ignite.internal.binary.BinaryMetadata;
* The version refers solely to the internal protocol for updating
BinaryMetadata and is unknown externally.
* It can be updated dynamically from different nodes and threads on the same
node.
*/
-final class BinaryMetadataVersionInfo implements Serializable {
+public final class BinaryMetadataVersionInfo implements Serializable, Message {
+ /** Type code. */
+ public static final short TYPE_CODE = 505;
+
/** */
private static final long serialVersionUID = 0L;
/** The actual binary metadata. */
- private final BinaryMetadata metadata;
+ private BinaryMetadata metadata;
+
+ /** Serialized binary metadata. */
+ @Order(0)
+ private transient byte[] metadataBytes;
/**
* The version of metadata that has been proposed for update. This
represents how many unique updates have been issued
* for this type. When a metadata update is proposed, this version is
incremented.
*/
- private final int pendingVer;
+ @Order(value = 1, method = "pendingVersion")
+ private int pendingVer;
/**
* The version of metadata that has been accepted by the entire cluster.
* This represents the number of updates that have been confirmed across
all nodes.
*/
- private final int acceptedVer;
+ @Order(value = 2, method = "acceptedVersion")
+ private int acceptedVer;
/** A flag indicating whether the metadata is currently being removed. */
private final transient boolean removing;
+ /** Constructor. */
+ public BinaryMetadataVersionInfo() {
+ removing = false;
+ }
+
/**
* @param metadata Metadata.
*/
@@ -86,26 +106,40 @@ final class BinaryMetadataVersionInfo implements
Serializable {
}
/**
- *
+ * @return Binary metadata.
*/
BinaryMetadata metadata() {
return metadata;
}
/**
- *
+ * @return The version of metadata that has been proposed for update.
*/
- int pendingVersion() {
+ public int pendingVersion() {
return pendingVer;
}
/**
- *
+ * @param pendingVer The version of metadata that has been proposed for
update.
+ */
+ public void pendingVersion(int pendingVer) {
+ this.pendingVer = pendingVer;
+ }
+
+ /**
+ * @return The version of metadata that has been accepted by the entire
cluster.
*/
- int acceptedVersion() {
+ public int acceptedVersion() {
return acceptedVer;
}
+ /**
+ * @param acceptedVer The version of metadata that has been accepted by
the entire cluster.
+ */
+ public void acceptedVersion(int acceptedVer) {
+ this.acceptedVer = acceptedVer;
+ }
+
/**
* @return {@code true} is the metadata is removing now.
*/
@@ -113,6 +147,44 @@ final class BinaryMetadataVersionInfo implements
Serializable {
return removing;
}
+ /**
+ * @return Serialized binary metadata.
+ */
+ public byte[] metadataBytes() {
+ return metadataBytes;
+ }
+
+ /**
+ * @param metadataBytes Serialized binary metadata.
+ */
+ public void metadataBytes(byte[] metadataBytes) {
+ this.metadataBytes = metadataBytes;
+ }
+
+ /**
+ * Marshals binary metadata to byte array.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ public void marshalMetadata() throws IgniteCheckedException {
+ if (metadataBytes == null)
+ metadataBytes = U.marshal(jdk(), metadata);
+ }
+
+ /**
+ * Unmarshals binary metadata from byte array.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ public void unmarshalMetadata() throws IgniteCheckedException {
+ if (metadata == null && metadataBytes != null) {
+ metadata = U.unmarshal(jdk(), metadataBytes, U.gridClassLoader());
+
+ // It is not required anymore.
+ metadataBytes = null;
+ }
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return "[typeId=" + metadata.typeId() +
@@ -121,4 +193,9 @@ final class BinaryMetadataVersionInfo implements
Serializable {
", removing=" + removing +
"]";
}
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TYPE_CODE;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataResponseMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataResponseMessage.java
index ec3b1d9a331..160162e58f5 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataResponseMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataResponseMessage.java
@@ -16,24 +16,21 @@
*/
package org.apache.ignite.internal.processors.cache.binary;
-import java.nio.ByteBuffer;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
/**
* Carries latest version of metadata to client as a response for {@link
MetadataRequestMessage}.
*/
public class MetadataResponseMessage implements Message {
- /** */
+ /** Type ID. */
+ @Order(0)
private int typeId;
- /** */
- private byte[] binaryMetadataBytes;
-
- /** */
- private ClientResponseStatus status;
+ /** Binary metadata version info. */
+ @Order(value = 1, method = "metadataVersionInfo")
+ private BinaryMetadataVersionInfo metaVerInfo;
/** */
public MetadataResponseMessage() {
@@ -41,155 +38,43 @@ public class MetadataResponseMessage implements Message {
}
/**
- * @param typeId Type id.
+ * @param typeId Type ID.
*/
MetadataResponseMessage(int typeId) {
this.typeId = typeId;
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeByteArray(binaryMetadataBytes))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeByte(status != null ? (byte)status.ordinal()
: -1))
- return false;
-
- writer.incrementState();
-
- case 2:
- if (!writer.writeInt(typeId))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- switch (reader.state()) {
- case 0:
- binaryMetadataBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
- byte statusOrd;
-
- statusOrd = reader.readByte();
-
- if (!reader.isLastRead())
- return false;
-
- status = ClientResponseStatus.fromOrdinal(statusOrd);
-
- reader.incrementState();
-
- case 2:
- typeId = reader.readInt();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
/** {@inheritDoc} */
@Override public short directType() {
return 81;
}
/**
- * @param bytes Binary metadata bytes.
+ * @return Binary metadata version info.
*/
- void binaryMetadataBytes(byte[] bytes) {
- if (bytes != null)
- status = ClientResponseStatus.METADATA_FOUND;
- else
- status = ClientResponseStatus.METADATA_NOT_FOUND;
-
- binaryMetadataBytes = bytes;
+ public BinaryMetadataVersionInfo metadataVersionInfo() {
+ return metaVerInfo;
}
/**
- * Marks message if any exception happened during preparing response.
+ * @param metaVerInfo Binary metadata version info.
*/
- void markErrorOnRequest() {
- status = ClientResponseStatus.ERROR;
+ public void metadataVersionInfo(BinaryMetadataVersionInfo metaVerInfo) {
+ this.metaVerInfo = metaVerInfo;
}
/**
* @return Type ID.
*/
- int typeId() {
+ public int typeId() {
return typeId;
}
/**
- * @return Marshalled BinaryMetadata.
+ * @param typeId Type ID.
*/
- byte[] binaryMetadataBytes() {
- return binaryMetadataBytes;
- }
-
- /**
- * @return {@code true} if metadata was not found on server node replied
with the response.
- */
- boolean metadataNotFound() {
- return status == ClientResponseStatus.METADATA_NOT_FOUND;
- }
-
- /**
- * Response statuses enum.
- */
- private enum ClientResponseStatus {
- /** */
- METADATA_FOUND,
-
- /** */
- METADATA_NOT_FOUND,
-
- /** */
- ERROR;
-
- /** Enumerated values. */
- private static final ClientResponseStatus[] VALS = values();
-
- /**
- * Efficiently gets enumerated value from its ordinal.
- *
- * @param ord Ordinal value.
- * @return Enumerated value.
- */
- public static ClientResponseStatus fromOrdinal(byte ord) {
- return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
- }
+ public void typeId(int typeId) {
+ this.typeId = typeId;
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties
b/modules/core/src/main/resources/META-INF/classnames.properties
index 8b4f49c41ad..c578fc5101c 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1042,7 +1042,6 @@
org.apache.ignite.internal.processors.cache.binary.MetadataRemoveProposedMessage
org.apache.ignite.internal.processors.cache.binary.MetadataRemoveProposedMessage$ProposalStatus
org.apache.ignite.internal.processors.cache.binary.MetadataRequestMessage
org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage
-org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage$ClientResponseStatus
org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessage
org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage
org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage$ProposalStatus