This is an automated email from the ASF dual-hosted git repository.
shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new cd4c8cd835b IGNITE-26514 Refactor GridDhtForceKeysResponse (#12407)
cd4c8cd835b is described below
commit cd4c8cd835bdbe186705fc8df74312eae4df0b80
Author: Vladimir Steshin <[email protected]>
AuthorDate: Fri Oct 17 10:29:30 2025 +0300
IGNITE-26514 Refactor GridDhtForceKeysResponse (#12407)
---
.../managers/communication/ErrorMessage.java | 43 +++--
.../communication/GridIoMessageFactory.java | 3 +-
.../dht/preloader/GridDhtForceKeysResponse.java | 196 +++++++--------------
3 files changed, 93 insertions(+), 149 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ErrorMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ErrorMessage.java
index 71787abccec..62b7bb7972b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ErrorMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ErrorMessage.java
@@ -34,14 +34,14 @@ import static org.apache.ignite.marshaller.Marshallers.jdk;
* to transfer some error as part of some message. See {@link
MessageProcessor} for details.
* <p>Currently, under the hood marshalling and unmarshalling is performed by
{@link JdkMarshaller}.
*/
-@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+@SuppressWarnings({"AssignmentOrReturnOfFieldWithMutableType",
"NullableProblems"})
public class ErrorMessage implements Message {
/** Serialized form of throwable. */
@Order(value = 0, method = "errorBytes")
- private byte @Nullable [] errBytes;
+ private @Nullable byte[] errBytes;
/** Original error. It is transient and necessary only to avoid duplicated
serialization and deserializtion. */
- private @Nullable Throwable err;
+ private volatile @Nullable Throwable err;
/**
* Default constructor.
@@ -60,7 +60,7 @@ public class ErrorMessage implements Message {
/**
* @return Serialized form of throwable.
*/
- public byte @Nullable [] errorBytes() {
+ public @Nullable byte[] errorBytes() {
try {
if (errBytes == null && err != null)
errBytes = U.marshal(jdk(), err);
@@ -75,7 +75,7 @@ public class ErrorMessage implements Message {
/**
* @param errBytes New serialized form of throwable.
*/
- public void errorBytes(byte @Nullable [] errBytes) {
+ public void errorBytes(@Nullable byte[] errBytes) {
this.errBytes = errBytes;
}
@@ -83,19 +83,34 @@ public class ErrorMessage implements Message {
* @return Original {@link Throwable}.
*/
public @Nullable Throwable toThrowable() {
- try {
+ if (err != null)
+ return err;
+
+ synchronized (this) {
if (err == null && errBytes != null) {
- err = U.unmarshal(jdk(), errBytes, U.gridClassLoader());
+ try {
+ err = U.unmarshal(jdk(), errBytes, U.gridClassLoader());
- // It is not necessary now.
- errBytes = null;
+ // It is not required anymore.
+ errBytes = null;
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
}
-
- return err;
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
}
+
+ return err;
+ }
+
+ /**
+ * Safely gets original error from an error message.
+ *
+ * @param errorMsg Error message.
+ * @return Error containing in the message.
+ */
+ public static @Nullable Throwable error(@Nullable ErrorMessage errorMsg) {
+ return errorMsg == null ? null : errorMsg.toThrowable();
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 3cdfc15a6da..2d5d2db0bbd 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -48,6 +48,7 @@ import
org.apache.ignite.internal.codegen.GridDeploymentResponseSerializer;
import
org.apache.ignite.internal.codegen.GridDhtAffinityAssignmentRequestSerializer;
import org.apache.ignite.internal.codegen.GridDhtAtomicNearResponseSerializer;
import org.apache.ignite.internal.codegen.GridDhtForceKeysRequestSerializer;
+import org.apache.ignite.internal.codegen.GridDhtForceKeysResponseSerializer;
import org.apache.ignite.internal.codegen.GridDhtLockRequestSerializer;
import org.apache.ignite.internal.codegen.GridDhtLockResponseSerializer;
import
org.apache.ignite.internal.codegen.GridDhtPartitionDemandMessageSerializer;
@@ -308,7 +309,7 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)40, GridNearAtomicFullUpdateRequest::new);
factory.register((short)41, GridNearAtomicUpdateResponse::new, new
GridNearAtomicUpdateResponseSerializer());
factory.register((short)42, GridDhtForceKeysRequest::new, new
GridDhtForceKeysRequestSerializer());
- factory.register((short)43, GridDhtForceKeysResponse::new);
+ factory.register((short)43, GridDhtForceKeysResponse::new, new
GridDhtForceKeysResponseSerializer());
factory.register((short)45, GridDhtPartitionDemandMessage::new, new
GridDhtPartitionDemandMessageSerializer());
factory.register((short)46, GridDhtPartitionsFullMessage::new);
factory.register((short)47, GridDhtPartitionsSingleMessage::new);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
index 1611b58e1db..e6f24aea5f8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
@@ -17,14 +17,12 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.communication.ErrorMessage;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
@@ -32,38 +30,35 @@ import
org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
-import
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
/**
* Force keys response. Contains absent keys.
*/
public class GridDhtForceKeysResponse extends GridCacheIdMessage implements
GridCacheDeployable {
/** Future ID. */
+ @Order(value = 4, method = "futureId")
private IgniteUuid futId;
/** Mini-future ID. */
+ @Order(5)
private IgniteUuid miniId;
- /** Error. */
- @GridDirectTransient
- private volatile IgniteCheckedException err;
-
- /** Serialized error. */
- private byte[] errBytes;
+ /** Error message. */
+ @Order(value = 6, method = "errorMessage")
+ @Nullable private volatile ErrorMessage errMsg;
/** Missed (not found) keys. */
@GridToStringInclude
- @GridDirectCollection(KeyCacheObject.class)
+ @Order(7)
private List<KeyCacheObject> missedKeys;
/** Cache entries. */
@GridToStringInclude
- @GridDirectCollection(GridCacheEntryInfo.class)
+ @Order(value = 8, method = "forcedInfos")
private List<GridCacheEntryInfo> infos;
/**
@@ -95,26 +90,54 @@ public class GridDhtForceKeysResponse extends
GridCacheIdMessage implements Grid
* @param err Error.
*/
public void error(IgniteCheckedException err) {
- this.err = err;
+ errorMessage(new ErrorMessage(err));
}
/** {@inheritDoc} */
- @Override public IgniteCheckedException error() {
- return err;
+ @Override public @Nullable Throwable error() {
+ return ErrorMessage.error(errMsg);
+ }
+
+ /**
+ * @return The error message.
+ */
+ @Nullable public ErrorMessage errorMessage() {
+ return errMsg;
+ }
+
+ /**
+ * Sets the error message.
+ *
+ * @param errMsg Error message.
+ */
+ public void errorMessage(@Nullable ErrorMessage errMsg) {
+ this.errMsg = errMsg;
}
/**
* @return Keys.
*/
public Collection<KeyCacheObject> missedKeys() {
- return missedKeys == null ? Collections.<KeyCacheObject>emptyList() :
missedKeys;
+ return F.emptyIfNull(missedKeys);
+ }
+
+ /** @param missedKeys Missed keys. */
+ public void missedKeys(List<KeyCacheObject> missedKeys) {
+ this.missedKeys = missedKeys;
}
/**
* @return Forced entries.
*/
public Collection<GridCacheEntryInfo> forcedInfos() {
- return infos == null ? Collections.<GridCacheEntryInfo>emptyList() :
infos;
+ return F.emptyIfNull(infos);
+ }
+
+ /**
+ * @param infos Forced entries.
+ */
+ public void forcedInfos(List<GridCacheEntryInfo> infos) {
+ this.infos = infos;
}
/**
@@ -124,6 +147,13 @@ public class GridDhtForceKeysResponse extends
GridCacheIdMessage implements Grid
return futId;
}
+ /**
+ * @param futId Future ID.
+ */
+ public void futureId(IgniteUuid futId) {
+ this.futId = futId;
+ }
+
/**
* @return Mini-future ID.
*/
@@ -131,6 +161,13 @@ public class GridDhtForceKeysResponse extends
GridCacheIdMessage implements Grid
return miniId;
}
+ /**
+ * @param miniId Mini-future ID.
+ */
+ public void miniId(IgniteUuid miniId) {
+ this.miniId = miniId;
+ }
+
/**
* @param key Key.
*/
@@ -154,10 +191,10 @@ public class GridDhtForceKeysResponse extends
GridCacheIdMessage implements Grid
}
/** {@inheritDoc} */
- @Override public void prepareMarshal(GridCacheSharedContext ctx) throws
IgniteCheckedException {
+ @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
super.prepareMarshal(ctx);
- GridCacheContext cctx = ctx.cacheContext(cacheId);
+ GridCacheContext<?, ?> cctx = ctx.cacheContext(cacheId);
if (missedKeys != null)
prepareMarshalCacheObjects(missedKeys, cctx);
@@ -166,16 +203,13 @@ public class GridDhtForceKeysResponse extends
GridCacheIdMessage implements Grid
for (GridCacheEntryInfo info : infos)
info.marshal(cctx.cacheObjectContext());
}
-
- if (err != null && errBytes == null)
- errBytes = U.marshal(ctx, err);
}
/** {@inheritDoc} */
- @Override public void finishUnmarshal(GridCacheSharedContext ctx,
ClassLoader ldr) throws IgniteCheckedException {
+ @Override public void finishUnmarshal(GridCacheSharedContext<?, ?> ctx,
ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- GridCacheContext cctx = ctx.cacheContext(cacheId);
+ GridCacheContext<?, ?> cctx = ctx.cacheContext(cacheId);
if (missedKeys != null)
finishUnmarshalCacheObjects(missedKeys, cctx, ldr);
@@ -184,9 +218,6 @@ public class GridDhtForceKeysResponse extends
GridCacheIdMessage implements Grid
for (GridCacheEntryInfo info : infos)
info.unmarshal(cctx.cacheObjectContext(), ldr);
}
-
- if (errBytes != null && err == null)
- err = U.unmarshal(ctx, errBytes, U.resolveClassLoader(ldr,
ctx.gridConfig()));
}
/** {@inheritDoc} */
@@ -194,109 +225,6 @@ public class GridDhtForceKeysResponse extends
GridCacheIdMessage implements Grid
return addDepInfo;
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 4:
- if (!writer.writeByteArray(errBytes))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeIgniteUuid(futId))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeCollection(infos,
MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeIgniteUuid(miniId))
- return false;
-
- writer.incrementState();
-
- case 8:
- if (!writer.writeCollection(missedKeys,
MessageCollectionItemType.KEY_CACHE_OBJECT))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 4:
- errBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- futId = reader.readIgniteUuid();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
- infos = reader.readCollection(MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
- miniId = reader.readIgniteUuid();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 8:
- missedKeys =
reader.readCollection(MessageCollectionItemType.KEY_CACHE_OBJECT);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
/** {@inheritDoc} */
@Override public short directType() {
return 43;