This is an automated email from the ASF dual-hosted git repository.

shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new d008388a910 IGNITE-26736 Use MessageSerializer for Calcite's 
ErrorMessage (#12457)
d008388a910 is described below

commit d008388a910247ae4388ff7b947df516221a8f81
Author: Vladimir Steshin <[email protected]>
AuthorDate: Sun Nov 2 15:12:52 2025 +0300

    IGNITE-26736 Use MessageSerializer for Calcite's ErrorMessage (#12457)
---
 .../query/calcite/exec/ExchangeServiceImpl.java    |   4 +-
 .../query/calcite/exec/ExecutionServiceImpl.java   |   6 +-
 .../query/calcite/message/CalciteErrorMessage.java |  84 +++++++++++
 .../query/calcite/message/ErrorMessage.java        | 167 ---------------------
 .../query/calcite/message/MessageType.java         |   3 +-
 .../managers/communication/ErrorMessage.java       |   3 +-
 6 files changed, 93 insertions(+), 174 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
index 14f6357a612..445a42e7c2c 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
@@ -32,7 +32,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpIoTracker;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker;
-import 
org.apache.ignite.internal.processors.query.calcite.message.ErrorMessage;
+import 
org.apache.ignite.internal.processors.query.calcite.message.CalciteErrorMessage;
 import 
org.apache.ignite.internal.processors.query.calcite.message.InboxCloseMessage;
 import 
org.apache.ignite.internal.processors.query.calcite.message.MessageService;
 import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
@@ -171,7 +171,7 @@ public class ExchangeServiceImpl extends AbstractService 
implements ExchangeServ
 
     /** {@inheritDoc} */
     @Override public void sendError(UUID nodeId, UUID qryId, long fragmentId, 
Throwable err) throws IgniteCheckedException {
-        messageService().send(nodeId, new ErrorMessage(qryId, fragmentId, 
err));
+        messageService().send(nodeId, new CalciteErrorMessage(qryId, 
fragmentId, err));
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 23b6424db39..5fe024f934e 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -74,7 +74,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpIoTr
 import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.PerformanceStatisticsIoTracker;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.QueryMemoryTracker;
-import 
org.apache.ignite.internal.processors.query.calcite.message.ErrorMessage;
+import 
org.apache.ignite.internal.processors.query.calcite.message.CalciteErrorMessage;
 import 
org.apache.ignite.internal.processors.query.calcite.message.MessageService;
 import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
 import 
org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest;
@@ -472,7 +472,7 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
     @Override public void init() {
         messageService().register((n, m) -> onMessage(n, 
(QueryStartRequest)m), MessageType.QUERY_START_REQUEST);
         messageService().register((n, m) -> onMessage(n, 
(QueryStartResponse)m), MessageType.QUERY_START_RESPONSE);
-        messageService().register((n, m) -> onMessage(n, (ErrorMessage)m), 
MessageType.QUERY_ERROR_MESSAGE);
+        messageService().register((n, m) -> onMessage(n, 
(CalciteErrorMessage)m), MessageType.QUERY_ERROR_MESSAGE);
 
         eventManager().addDiscoveryEventListener(discoLsnr, 
EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
 
@@ -945,7 +945,7 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
     }
 
     /** */
-    private void onMessage(UUID nodeId, ErrorMessage msg) {
+    private void onMessage(UUID nodeId, CalciteErrorMessage msg) {
         assert nodeId != null && msg != null;
 
         Query<?> qry = qryReg.query(msg.queryId());
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteErrorMessage.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteErrorMessage.java
new file mode 100644
index 00000000000..00b8f2e513b
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteErrorMessage.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.message;
+
+import java.util.UUID;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.communication.ErrorMessage;
+
+/**
+ *
+ */
+public class CalciteErrorMessage extends ErrorMessage implements 
CalciteMessage {
+    /** */
+    @Order(value = 1, method = "queryId")
+    private UUID qryId;
+
+    /** */
+    @Order(2)
+    private long fragmentId;
+
+    /** */
+    public CalciteErrorMessage() {
+        // No-op.
+    }
+
+    /** */
+    public CalciteErrorMessage(UUID qryId, long fragmentId, Throwable err) {
+        super(err);
+
+        assert err != null;
+
+        this.qryId = qryId;
+        this.fragmentId = fragmentId;
+    }
+
+    /**
+     * @return Query ID.
+     */
+    public UUID queryId() {
+        return qryId;
+    }
+
+    /** */
+    public void queryId(UUID qryId) {
+        this.qryId = qryId;
+    }
+
+    /**
+     * @return Fragment ID.
+     */
+    public long fragmentId() {
+        return fragmentId;
+    }
+
+    /** */
+    public void fragmentId(long fragmentId) {
+        this.fragmentId = fragmentId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public MessageType type() {
+        return MessageType.QUERY_ERROR_MESSAGE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return MessageType.QUERY_ERROR_MESSAGE.directType();
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java
deleted file mode 100644
index 030d13ff262..00000000000
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.message;
-
-import java.nio.ByteBuffer;
-import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- *
- */
-public class ErrorMessage implements MarshalableMessage {
-    /** */
-    private UUID queryId;
-
-    /** */
-    private long fragmentId;
-
-    /** */
-    private byte[] errBytes;
-
-    /** */
-    @GridDirectTransient
-    private Throwable err;
-
-    /** */
-    public ErrorMessage() {
-        // No-op.
-    }
-
-    /** */
-    public ErrorMessage(UUID queryId, long fragmentId, Throwable err) {
-        assert err != null;
-
-        this.queryId = queryId;
-        this.fragmentId = fragmentId;
-        this.err = err;
-    }
-
-    /**
-     * @return Query ID.
-     */
-    public UUID queryId() {
-        return queryId;
-    }
-
-    /**
-     * @return Fragment ID.
-     */
-    public long fragmentId() {
-        return fragmentId;
-    }
-
-    /**
-     * @return Marshaled Throwable.
-     */
-    public Throwable error() {
-        assert err != null;
-
-        return err;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeByteArray(errBytes))
-                    return false;
-
-                writer.incrementState();
-
-            case 1:
-                if (!writer.writeLong(fragmentId))
-                    return false;
-
-                writer.incrementState();
-
-            case 2:
-                if (!writer.writeUuid(queryId))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        switch (reader.state()) {
-            case 0:
-                errBytes = reader.readByteArray();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 1:
-                fragmentId = reader.readLong();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 2:
-                queryId = reader.readUuid();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public MessageType type() {
-        return MessageType.QUERY_ERROR_MESSAGE;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) 
throws IgniteCheckedException {
-        errBytes = U.marshal(ctx, err);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) 
throws IgniteCheckedException {
-        if (errBytes != null)
-            err = U.unmarshal(ctx, errBytes, 
U.resolveClassLoader(ctx.gridConfig()));
-    }
-}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
index e8932e7dab9..e2424054d55 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.calcite.message;
 
 import java.util.function.Supplier;
+import org.apache.ignite.internal.codegen.CalciteErrorMessageSerializer;
 import org.apache.ignite.internal.codegen.ColocationGroupSerializer;
 import org.apache.ignite.internal.codegen.FragmentDescriptionSerializer;
 import org.apache.ignite.internal.codegen.FragmentMappingSerializer;
@@ -42,7 +43,7 @@ public enum MessageType {
     QUERY_START_RESPONSE(301, QueryStartResponse::new, new 
QueryStartResponseSerializer()),
 
     /** */
-    QUERY_ERROR_MESSAGE(302, ErrorMessage::new),
+    QUERY_ERROR_MESSAGE(302, CalciteErrorMessage::new, new 
CalciteErrorMessageSerializer()),
 
     /** */
     QUERY_BATCH_MESSAGE(303, QueryBatchMessage::new),
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ErrorMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ErrorMessage.java
index 71b7955d48f..8bed97aa47f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ErrorMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ErrorMessage.java
@@ -22,6 +22,7 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.MessageProcessor;
 import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
@@ -101,7 +102,7 @@ public class ErrorMessage implements Message {
      * @see MessageWriter
      */
     public void errorBytes(@Nullable byte[] errBytes) {
-        if (errBytes == null)
+        if (F.isEmpty(errBytes))
             err = null;
         else {
             try {

Reply via email to