This is an automated email from the ASF dual-hosted git repository.
shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new fca53d51cb4 IGNITE-27553 Use MessageSerializer for
SchemaAbstractDiscoveryMessage and successors (#12632)
fca53d51cb4 is described below
commit fca53d51cb4d9ed648d55ec6b397082c88e259de
Author: Ilya Shishkov <[email protected]>
AuthorDate: Tue Mar 3 17:43:56 2026 +0300
IGNITE-27553 Use MessageSerializer for SchemaAbstractDiscoveryMessage and
successors (#12632)
---
.../discovery/DiscoveryMessageFactory.java | 6 ++
.../processors/query/GridQueryProcessor.java | 8 +-
.../message/SchemaAbstractDiscoveryMessage.java | 105 ++++++++++++++++++++-
.../message/SchemaFinishDiscoveryMessage.java | 44 ++++-----
.../message/SchemaProposeDiscoveryMessage.java | 46 ++++-----
.../failure/FailureHandlerTriggeredTest.java | 4 +
6 files changed, 147 insertions(+), 66 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
index 221e7708ae0..db8a07604b5 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
@@ -23,6 +23,10 @@ import
org.apache.ignite.internal.processors.cache.binary.MetadataRemoveAccepted
import
org.apache.ignite.internal.processors.cache.binary.MetadataRemoveAcceptedMessageSerializer;
import
org.apache.ignite.internal.processors.cache.binary.MetadataRemoveProposedMessage;
import
org.apache.ignite.internal.processors.cache.binary.MetadataRemoveProposedMessageSerializer;
+import
org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage;
+import
org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessageSerializer;
+import
org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
+import
org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessageSerializer;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
@@ -133,5 +137,7 @@ public class DiscoveryMessageFactory implements
MessageFactoryProvider {
factory.register((short)501, SecurityAwareCustomMessageWrapper::new,
new SecurityAwareCustomMessageWrapperSerializer());
factory.register((short)502, MetadataRemoveAcceptedMessage::new, new
MetadataRemoveAcceptedMessageSerializer());
factory.register((short)503, MetadataRemoveProposedMessage::new, new
MetadataRemoveProposedMessageSerializer());
+ factory.register((short)504, SchemaProposeDiscoveryMessage::new, new
SchemaProposeDiscoveryMessageSerializer());
+ factory.register((short)505, SchemaFinishDiscoveryMessage::new, new
SchemaFinishDiscoveryMessageSerializer());
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 894382807e1..6df09f83c41 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -909,9 +909,6 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
}
}
- // Propose message will be used from exchange thread to
- msg.proposeMessage(proposeMsg);
-
if (exchangeReady) {
SchemaOperation op = schemaOps.get(proposeMsg.schemaName());
@@ -1930,7 +1927,10 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
*/
public void onCoordinatorFinished(SchemaAbstractOperation op, @Nullable
SchemaOperationException err, boolean nop) {
synchronized (stateMux) {
- SchemaFinishDiscoveryMessage msg = new
SchemaFinishDiscoveryMessage(op, err, nop);
+ SchemaFinishDiscoveryMessage msg = new
SchemaFinishDiscoveryMessage(op, nop);
+
+ if (err != null)
+ msg.onError(err);
try {
ctx.discovery().sendCustomEvent(msg);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java
index f55eae0922f..fc4642aa514 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java
@@ -17,29 +17,62 @@
package org.apache.ignite.internal.processors.query.schema.message;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import
org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
import
org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.marshaller.Marshallers;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;
/**
* Abstract discovery message for schema operations.
*/
-public abstract class SchemaAbstractDiscoveryMessage implements
DiscoveryCustomMessage {
+public abstract class SchemaAbstractDiscoveryMessage implements
DiscoveryCustomMessage, Message {
/** */
private static final long serialVersionUID = 0L;
/** ID */
- private final IgniteUuid id = IgniteUuid.randomUuid();
+ @Order(0)
+ IgniteUuid id;
/** Operation. */
@GridToStringInclude
- protected final SchemaAbstractOperation op;
+ private SchemaAbstractOperation op;
+
+ /**
+ * Operation bytes. Serialized reprezentation of schema operation.
+ * TODO Should be removed in IGNITE-27559
+ */
+ @Order(value = 1, method = "operationBytes")
+ byte[] opBytes;
+
+ /** Error message. */
+ @Order(2)
+ String errMsg;
+
+ /** Error code. */
+ @Order(3)
+ int errCode;
+
+ /** Error. */
+ SchemaOperationException err;
+
+ /**
+ * Constructor.
+ */
+ protected SchemaAbstractDiscoveryMessage() {
+ // No-op.
+ }
/**
* Constructor.
@@ -47,6 +80,9 @@ public abstract class SchemaAbstractDiscoveryMessage
implements DiscoveryCustomM
* @param op Operation.
*/
protected SchemaAbstractDiscoveryMessage(SchemaAbstractOperation op) {
+ id = IgniteUuid.randomUuid();
+ errCode = -1;
+
this.op = op;
}
@@ -65,7 +101,68 @@ public abstract class SchemaAbstractDiscoveryMessage
implements DiscoveryCustomM
* @return Operation.
*/
public SchemaAbstractOperation operation() {
- return op;
+ try {
+ return op != null ? op : U.unmarshal(Marshallers.jdk(), opBytes,
null);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException("Failed to unmarshal schema operation",
e);
+ }
+ }
+
+ /**
+ * @return Operation bytes.
+ */
+ public byte[] operationBytes() {
+ try {
+ return opBytes != null ? opBytes : U.marshal(Marshallers.jdk(),
op);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException("Failed to marshal schema operation", e);
+ }
+ }
+
+ /**
+ * @param opBytes Operation bytes.
+ */
+ public void operationBytes(byte[] opBytes) {
+ this.opBytes = opBytes;
+ }
+
+ /**
+ * Set error.
+ *
+ * @param err Error.
+ */
+ public void onError(SchemaOperationException err) {
+ if (!hasError()) {
+ this.err = err;
+
+ errMsg = err.getMessage();
+ errCode = err.code();
+
+ if (err.getCause() != null)
+ errMsg += ": " + err.getCause().getMessage();
+ }
+ }
+
+ /**
+ * @return {@code True} if error was reported during init.
+ */
+ public boolean hasError() {
+ return err != null || errMsg != null || errCode > -1;
+ }
+
+ /**
+ * @return Error message (if any).
+ */
+ @Nullable public SchemaOperationException error() {
+ if (!hasError())
+ return null;
+
+ if (err == null)
+ err = new SchemaOperationException(errMsg, errCode);
+
+ return err;
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
index 57d835152cb..a0204816560 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
@@ -17,8 +17,8 @@
package org.apache.ignite.internal.processors.query.schema.message;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
-import
org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
import
org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
@@ -30,23 +30,26 @@ public class SchemaFinishDiscoveryMessage extends
SchemaAbstractDiscoveryMessage
/** */
private static final long serialVersionUID = 0L;
- /** Error. */
- private final SchemaOperationException err;
-
/** No-op flag. */
- private final boolean nop;
+ @Order(0)
+ boolean nop;
+
+ /**
+ * Constructor.
+ */
+ public SchemaFinishDiscoveryMessage() {
+ // No-op.
+ }
/**
* Constructor.
*
* @param op Original operation.
- * @param err Error.
* @param nop No-op flag.
*/
- public SchemaFinishDiscoveryMessage(SchemaAbstractOperation op,
SchemaOperationException err, boolean nop) {
+ public SchemaFinishDiscoveryMessage(SchemaAbstractOperation op, boolean
nop) {
super(op);
- this.err = err;
this.nop = nop;
}
@@ -65,26 +68,6 @@ public class SchemaFinishDiscoveryMessage extends
SchemaAbstractDiscoveryMessage
return false;
}
- /**
- * @return {@code True} if error was reported during init.
- */
- public boolean hasError() {
- return err != null;
- }
-
- /**
- * @return Error message (if any).
- */
- @Nullable public SchemaOperationException error() {
- return err;
- }
-
- /**
- * @param proposeMsg Propose message.
- */
- public void proposeMessage(SchemaProposeDiscoveryMessage proposeMsg) {
- }
-
/**
* @return <code>True</code> if message in no-op.
*/
@@ -96,4 +79,9 @@ public class SchemaFinishDiscoveryMessage extends
SchemaAbstractDiscoveryMessage
@Override public String toString() {
return S.toString(SchemaFinishDiscoveryMessage.class, this, "parent",
super.toString());
}
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 505;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
index 0e1270b17b6..1cf398d2fd9 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
@@ -17,8 +17,8 @@
package org.apache.ignite.internal.processors.query.schema.message;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
-import
org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
import
org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
@@ -32,13 +32,18 @@ public class SchemaProposeDiscoveryMessage extends
SchemaAbstractDiscoveryMessag
private static final long serialVersionUID = 0L;
/** Cache deployment ID. */
- private IgniteUuid depId;
-
- /** Error. */
- private SchemaOperationException err;
+ @Order(0)
+ IgniteUuid depId;
/** Whether to perform exchange. */
- private transient boolean exchange;
+ private boolean exchange;
+
+ /**
+ * Constructor.
+ */
+ public SchemaProposeDiscoveryMessage() {
+ // No-op.
+ }
/**
* Constructor.
@@ -93,30 +98,6 @@ public class SchemaProposeDiscoveryMessage extends
SchemaAbstractDiscoveryMessag
return deploymentId() != null || hasError();
}
- /**
- * Set error.
- *
- * @param err Error.
- */
- public void onError(SchemaOperationException err) {
- if (!hasError())
- this.err = err;
- }
-
- /**
- * @return {@code True} if error was reported during init.
- */
- public boolean hasError() {
- return err != null;
- }
-
- /**
- * @return Error message (if any).
- */
- @Nullable public SchemaOperationException error() {
- return err;
- }
-
/**
* @return Schema name.
*/
@@ -128,4 +109,9 @@ public class SchemaProposeDiscoveryMessage extends
SchemaAbstractDiscoveryMessag
@Override public String toString() {
return S.toString(SchemaProposeDiscoveryMessage.class, this, "parent",
super.toString());
}
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 504;
+ }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java
b/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java
index 046124622ca..f19b39ea6bc 100644
---
a/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java
@@ -128,6 +128,10 @@ public class FailureHandlerTriggeredTest extends
GridCommonAbstractTest {
@Override public boolean isMutable() {
return false;
}
+
+ @Override public short directType() {
+ return 0;
+ }
});
}