This is an automated email from the ASF dual-hosted git repository.

shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new fca53d51cb4 IGNITE-27553 Use MessageSerializer for 
SchemaAbstractDiscoveryMessage and successors (#12632)
fca53d51cb4 is described below

commit fca53d51cb4d9ed648d55ec6b397082c88e259de
Author: Ilya Shishkov <[email protected]>
AuthorDate: Tue Mar 3 17:43:56 2026 +0300

    IGNITE-27553 Use MessageSerializer for SchemaAbstractDiscoveryMessage and 
successors (#12632)
---
 .../discovery/DiscoveryMessageFactory.java         |   6 ++
 .../processors/query/GridQueryProcessor.java       |   8 +-
 .../message/SchemaAbstractDiscoveryMessage.java    | 105 ++++++++++++++++++++-
 .../message/SchemaFinishDiscoveryMessage.java      |  44 ++++-----
 .../message/SchemaProposeDiscoveryMessage.java     |  46 ++++-----
 .../failure/FailureHandlerTriggeredTest.java       |   4 +
 6 files changed, 147 insertions(+), 66 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
index 221e7708ae0..db8a07604b5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
@@ -23,6 +23,10 @@ import 
org.apache.ignite.internal.processors.cache.binary.MetadataRemoveAccepted
 import 
org.apache.ignite.internal.processors.cache.binary.MetadataRemoveAcceptedMessageSerializer;
 import 
org.apache.ignite.internal.processors.cache.binary.MetadataRemoveProposedMessage;
 import 
org.apache.ignite.internal.processors.cache.binary.MetadataRemoveProposedMessageSerializer;
+import 
org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage;
+import 
org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessageSerializer;
+import 
org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
+import 
org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessageSerializer;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
 import 
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
 import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
@@ -133,5 +137,7 @@ public class DiscoveryMessageFactory implements 
MessageFactoryProvider {
         factory.register((short)501, SecurityAwareCustomMessageWrapper::new, 
new SecurityAwareCustomMessageWrapperSerializer());
         factory.register((short)502, MetadataRemoveAcceptedMessage::new, new 
MetadataRemoveAcceptedMessageSerializer());
         factory.register((short)503, MetadataRemoveProposedMessage::new, new 
MetadataRemoveProposedMessageSerializer());
+        factory.register((short)504, SchemaProposeDiscoveryMessage::new, new 
SchemaProposeDiscoveryMessageSerializer());
+        factory.register((short)505, SchemaFinishDiscoveryMessage::new, new 
SchemaFinishDiscoveryMessageSerializer());
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 894382807e1..6df09f83c41 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -909,9 +909,6 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
                 }
             }
 
-            // Propose message will be used from exchange thread to
-            msg.proposeMessage(proposeMsg);
-
             if (exchangeReady) {
                 SchemaOperation op = schemaOps.get(proposeMsg.schemaName());
 
@@ -1930,7 +1927,10 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
      */
     public void onCoordinatorFinished(SchemaAbstractOperation op, @Nullable 
SchemaOperationException err, boolean nop) {
         synchronized (stateMux) {
-            SchemaFinishDiscoveryMessage msg = new 
SchemaFinishDiscoveryMessage(op, err, nop);
+            SchemaFinishDiscoveryMessage msg = new 
SchemaFinishDiscoveryMessage(op, nop);
+
+            if (err != null)
+                msg.onError(err);
 
             try {
                 ctx.discovery().sendCustomEvent(msg);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java
index f55eae0922f..fc4642aa514 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java
@@ -17,29 +17,62 @@
 
 package org.apache.ignite.internal.processors.query.schema.message;
 
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
 import 
org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.marshaller.Marshallers;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Abstract discovery message for schema operations.
  */
-public abstract class SchemaAbstractDiscoveryMessage implements 
DiscoveryCustomMessage {
+public abstract class SchemaAbstractDiscoveryMessage implements 
DiscoveryCustomMessage, Message {
     /** */
     private static final long serialVersionUID = 0L;
 
     /** ID */
-    private final IgniteUuid id = IgniteUuid.randomUuid();
+    @Order(0)
+    IgniteUuid id;
 
     /** Operation. */
     @GridToStringInclude
-    protected final SchemaAbstractOperation op;
+    private SchemaAbstractOperation op;
+
+    /**
+     * Operation bytes. Serialized reprezentation of schema operation.
+     * TODO Should be removed in IGNITE-27559
+     */
+    @Order(value = 1, method = "operationBytes")
+    byte[] opBytes;
+
+    /** Error message. */
+    @Order(2)
+    String errMsg;
+
+    /** Error code. */
+    @Order(3)
+    int errCode;
+
+    /** Error. */
+    SchemaOperationException err;
+
+    /**
+     * Constructor.
+     */
+    protected SchemaAbstractDiscoveryMessage() {
+        // No-op.
+    }
 
     /**
      * Constructor.
@@ -47,6 +80,9 @@ public abstract class SchemaAbstractDiscoveryMessage 
implements DiscoveryCustomM
      * @param op Operation.
      */
     protected SchemaAbstractDiscoveryMessage(SchemaAbstractOperation op) {
+        id = IgniteUuid.randomUuid();
+        errCode = -1;
+
         this.op = op;
     }
 
@@ -65,7 +101,68 @@ public abstract class SchemaAbstractDiscoveryMessage 
implements DiscoveryCustomM
      * @return Operation.
      */
     public SchemaAbstractOperation operation() {
-        return op;
+        try {
+            return op != null ? op : U.unmarshal(Marshallers.jdk(), opBytes, 
null);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException("Failed to unmarshal schema operation", 
e);
+        }
+    }
+
+    /**
+     * @return Operation bytes.
+     */
+    public byte[] operationBytes() {
+        try {
+            return opBytes != null ? opBytes : U.marshal(Marshallers.jdk(), 
op);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException("Failed to marshal schema operation", e);
+        }
+    }
+
+    /**
+     * @param opBytes Operation bytes.
+     */
+    public void operationBytes(byte[] opBytes) {
+        this.opBytes = opBytes;
+    }
+
+    /**
+     * Set error.
+     *
+     * @param err Error.
+     */
+    public void onError(SchemaOperationException err) {
+        if (!hasError()) {
+            this.err = err;
+
+            errMsg = err.getMessage();
+            errCode = err.code();
+
+            if (err.getCause() != null)
+                errMsg += ": " + err.getCause().getMessage();
+        }
+    }
+
+    /**
+     * @return {@code True} if error was reported during init.
+     */
+    public boolean hasError() {
+        return err != null || errMsg != null || errCode > -1;
+    }
+
+    /**
+     * @return Error message (if any).
+     */
+    @Nullable public SchemaOperationException error() {
+        if (!hasError())
+            return null;
+
+        if (err == null)
+            err = new SchemaOperationException(errMsg, errCode);
+
+        return err;
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
index 57d835152cb..a0204816560 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
@@ -17,8 +17,8 @@
 
 package org.apache.ignite.internal.processors.query.schema.message;
 
+import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
-import 
org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
 import 
org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
@@ -30,23 +30,26 @@ public class SchemaFinishDiscoveryMessage extends 
SchemaAbstractDiscoveryMessage
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Error. */
-    private final SchemaOperationException err;
-
     /** No-op flag. */
-    private final boolean nop;
+    @Order(0)
+    boolean nop;
+
+    /**
+     * Constructor.
+     */
+    public SchemaFinishDiscoveryMessage() {
+        // No-op.
+    }
 
     /**
      * Constructor.
      *
      * @param op Original operation.
-     * @param err Error.
      * @param nop No-op flag.
      */
-    public SchemaFinishDiscoveryMessage(SchemaAbstractOperation op, 
SchemaOperationException err, boolean nop) {
+    public SchemaFinishDiscoveryMessage(SchemaAbstractOperation op, boolean 
nop) {
         super(op);
 
-        this.err = err;
         this.nop = nop;
     }
 
@@ -65,26 +68,6 @@ public class SchemaFinishDiscoveryMessage extends 
SchemaAbstractDiscoveryMessage
         return false;
     }
 
-    /**
-     * @return {@code True} if error was reported during init.
-     */
-    public boolean hasError() {
-        return err != null;
-    }
-
-    /**
-     * @return Error message (if any).
-     */
-    @Nullable public SchemaOperationException error() {
-        return err;
-    }
-
-    /**
-     * @param proposeMsg Propose message.
-     */
-    public void proposeMessage(SchemaProposeDiscoveryMessage proposeMsg) {
-    }
-
     /**
      * @return <code>True</code> if message in no-op.
      */
@@ -96,4 +79,9 @@ public class SchemaFinishDiscoveryMessage extends 
SchemaAbstractDiscoveryMessage
     @Override public String toString() {
         return S.toString(SchemaFinishDiscoveryMessage.class, this, "parent", 
super.toString());
     }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 505;
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
index 0e1270b17b6..1cf398d2fd9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
@@ -17,8 +17,8 @@
 
 package org.apache.ignite.internal.processors.query.schema.message;
 
+import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
-import 
org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
 import 
org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
@@ -32,13 +32,18 @@ public class SchemaProposeDiscoveryMessage extends 
SchemaAbstractDiscoveryMessag
     private static final long serialVersionUID = 0L;
 
     /** Cache deployment ID. */
-    private IgniteUuid depId;
-
-    /** Error. */
-    private SchemaOperationException err;
+    @Order(0)
+    IgniteUuid depId;
 
     /** Whether to perform exchange. */
-    private transient boolean exchange;
+    private boolean exchange;
+
+    /**
+     * Constructor.
+     */
+    public SchemaProposeDiscoveryMessage() {
+        // No-op.
+    }
 
     /**
      * Constructor.
@@ -93,30 +98,6 @@ public class SchemaProposeDiscoveryMessage extends 
SchemaAbstractDiscoveryMessag
         return deploymentId() != null || hasError();
     }
 
-    /**
-     * Set error.
-     *
-     * @param err Error.
-     */
-    public void onError(SchemaOperationException err) {
-        if (!hasError())
-            this.err = err;
-    }
-
-    /**
-     * @return {@code True} if error was reported during init.
-     */
-    public boolean hasError() {
-        return err != null;
-    }
-
-    /**
-     * @return Error message (if any).
-     */
-    @Nullable public SchemaOperationException error() {
-        return err;
-    }
-
     /**
      * @return Schema name.
      */
@@ -128,4 +109,9 @@ public class SchemaProposeDiscoveryMessage extends 
SchemaAbstractDiscoveryMessag
     @Override public String toString() {
         return S.toString(SchemaProposeDiscoveryMessage.class, this, "parent", 
super.toString());
     }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 504;
+    }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java
 
b/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java
index 046124622ca..f19b39ea6bc 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java
@@ -128,6 +128,10 @@ public class FailureHandlerTriggeredTest extends 
GridCommonAbstractTest {
                 @Override public boolean isMutable() {
                     return false;
                 }
+
+                @Override public short directType() {
+                    return 0;
+                }
             });
         }
 

Reply via email to