This is an automated email from the ASF dual-hosted git repository.
shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new d008388a910 IGNITE-26736 Use MessageSerializer for Calcite's
ErrorMessage (#12457)
d008388a910 is described below
commit d008388a910247ae4388ff7b947df516221a8f81
Author: Vladimir Steshin <[email protected]>
AuthorDate: Sun Nov 2 15:12:52 2025 +0300
IGNITE-26736 Use MessageSerializer for Calcite's ErrorMessage (#12457)
---
.../query/calcite/exec/ExchangeServiceImpl.java | 4 +-
.../query/calcite/exec/ExecutionServiceImpl.java | 6 +-
.../query/calcite/message/CalciteErrorMessage.java | 84 +++++++++++
.../query/calcite/message/ErrorMessage.java | 167 ---------------------
.../query/calcite/message/MessageType.java | 3 +-
.../managers/communication/ErrorMessage.java | 3 +-
6 files changed, 93 insertions(+), 174 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
index 14f6357a612..445a42e7c2c 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
@@ -32,7 +32,7 @@ import
org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpIoTracker;
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker;
-import
org.apache.ignite.internal.processors.query.calcite.message.ErrorMessage;
+import
org.apache.ignite.internal.processors.query.calcite.message.CalciteErrorMessage;
import
org.apache.ignite.internal.processors.query.calcite.message.InboxCloseMessage;
import
org.apache.ignite.internal.processors.query.calcite.message.MessageService;
import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
@@ -171,7 +171,7 @@ public class ExchangeServiceImpl extends AbstractService
implements ExchangeServ
/** {@inheritDoc} */
@Override public void sendError(UUID nodeId, UUID qryId, long fragmentId,
Throwable err) throws IgniteCheckedException {
- messageService().send(nodeId, new ErrorMessage(qryId, fragmentId,
err));
+ messageService().send(nodeId, new CalciteErrorMessage(qryId,
fragmentId, err));
}
/** {@inheritDoc} */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 23b6424db39..5fe024f934e 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -74,7 +74,7 @@ import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpIoTr
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker;
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.PerformanceStatisticsIoTracker;
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.QueryMemoryTracker;
-import
org.apache.ignite.internal.processors.query.calcite.message.ErrorMessage;
+import
org.apache.ignite.internal.processors.query.calcite.message.CalciteErrorMessage;
import
org.apache.ignite.internal.processors.query.calcite.message.MessageService;
import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
import
org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest;
@@ -472,7 +472,7 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
@Override public void init() {
messageService().register((n, m) -> onMessage(n,
(QueryStartRequest)m), MessageType.QUERY_START_REQUEST);
messageService().register((n, m) -> onMessage(n,
(QueryStartResponse)m), MessageType.QUERY_START_RESPONSE);
- messageService().register((n, m) -> onMessage(n, (ErrorMessage)m),
MessageType.QUERY_ERROR_MESSAGE);
+ messageService().register((n, m) -> onMessage(n,
(CalciteErrorMessage)m), MessageType.QUERY_ERROR_MESSAGE);
eventManager().addDiscoveryEventListener(discoLsnr,
EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
@@ -945,7 +945,7 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
}
/** */
- private void onMessage(UUID nodeId, ErrorMessage msg) {
+ private void onMessage(UUID nodeId, CalciteErrorMessage msg) {
assert nodeId != null && msg != null;
Query<?> qry = qryReg.query(msg.queryId());
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteErrorMessage.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteErrorMessage.java
new file mode 100644
index 00000000000..00b8f2e513b
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteErrorMessage.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.message;
+
+import java.util.UUID;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.communication.ErrorMessage;
+
+/**
+ *
+ */
+public class CalciteErrorMessage extends ErrorMessage implements
CalciteMessage {
+ /** */
+ @Order(value = 1, method = "queryId")
+ private UUID qryId;
+
+ /** */
+ @Order(2)
+ private long fragmentId;
+
+ /** */
+ public CalciteErrorMessage() {
+ // No-op.
+ }
+
+ /** */
+ public CalciteErrorMessage(UUID qryId, long fragmentId, Throwable err) {
+ super(err);
+
+ assert err != null;
+
+ this.qryId = qryId;
+ this.fragmentId = fragmentId;
+ }
+
+ /**
+ * @return Query ID.
+ */
+ public UUID queryId() {
+ return qryId;
+ }
+
+ /** */
+ public void queryId(UUID qryId) {
+ this.qryId = qryId;
+ }
+
+ /**
+ * @return Fragment ID.
+ */
+ public long fragmentId() {
+ return fragmentId;
+ }
+
+ /** */
+ public void fragmentId(long fragmentId) {
+ this.fragmentId = fragmentId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public MessageType type() {
+ return MessageType.QUERY_ERROR_MESSAGE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return MessageType.QUERY_ERROR_MESSAGE.directType();
+ }
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java
deleted file mode 100644
index 030d13ff262..00000000000
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.message;
-
-import java.nio.ByteBuffer;
-import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- *
- */
-public class ErrorMessage implements MarshalableMessage {
- /** */
- private UUID queryId;
-
- /** */
- private long fragmentId;
-
- /** */
- private byte[] errBytes;
-
- /** */
- @GridDirectTransient
- private Throwable err;
-
- /** */
- public ErrorMessage() {
- // No-op.
- }
-
- /** */
- public ErrorMessage(UUID queryId, long fragmentId, Throwable err) {
- assert err != null;
-
- this.queryId = queryId;
- this.fragmentId = fragmentId;
- this.err = err;
- }
-
- /**
- * @return Query ID.
- */
- public UUID queryId() {
- return queryId;
- }
-
- /**
- * @return Fragment ID.
- */
- public long fragmentId() {
- return fragmentId;
- }
-
- /**
- * @return Marshaled Throwable.
- */
- public Throwable error() {
- assert err != null;
-
- return err;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeByteArray(errBytes))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeLong(fragmentId))
- return false;
-
- writer.incrementState();
-
- case 2:
- if (!writer.writeUuid(queryId))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- switch (reader.state()) {
- case 0:
- errBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
- fragmentId = reader.readLong();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 2:
- queryId = reader.readUuid();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public MessageType type() {
- return MessageType.QUERY_ERROR_MESSAGE;
- }
-
- /** {@inheritDoc} */
- @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
- errBytes = U.marshal(ctx, err);
- }
-
- /** {@inheritDoc} */
- @Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
- if (errBytes != null)
- err = U.unmarshal(ctx, errBytes,
U.resolveClassLoader(ctx.gridConfig()));
- }
-}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
index e8932e7dab9..e2424054d55 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.calcite.message;
import java.util.function.Supplier;
+import org.apache.ignite.internal.codegen.CalciteErrorMessageSerializer;
import org.apache.ignite.internal.codegen.ColocationGroupSerializer;
import org.apache.ignite.internal.codegen.FragmentDescriptionSerializer;
import org.apache.ignite.internal.codegen.FragmentMappingSerializer;
@@ -42,7 +43,7 @@ public enum MessageType {
QUERY_START_RESPONSE(301, QueryStartResponse::new, new
QueryStartResponseSerializer()),
/** */
- QUERY_ERROR_MESSAGE(302, ErrorMessage::new),
+ QUERY_ERROR_MESSAGE(302, CalciteErrorMessage::new, new
CalciteErrorMessageSerializer()),
/** */
QUERY_BATCH_MESSAGE(303, QueryBatchMessage::new),
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ErrorMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ErrorMessage.java
index 71b7955d48f..8bed97aa47f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ErrorMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ErrorMessage.java
@@ -22,6 +22,7 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.MessageProcessor;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
@@ -101,7 +102,7 @@ public class ErrorMessage implements Message {
* @see MessageWriter
*/
public void errorBytes(@Nullable byte[] errBytes) {
- if (errBytes == null)
+ if (F.isEmpty(errBytes))
err = null;
else {
try {