This is an automated email from the ASF dual-hosted git repository.
shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 6de2ff33819 IGNITE-26534 Refactor GridDhtLockResponse and successors
(#12377)
6de2ff33819 is described below
commit 6de2ff3381986eaffbd9e26b2b1c62bef65ca8f4
Author: Ilya Shishkov <[email protected]>
AuthorDate: Fri Oct 10 09:46:51 2025 +0300
IGNITE-26534 Refactor GridDhtLockResponse and successors (#12377)
---
.../internal/MessageSerializerGenerator.java | 3 +
.../apache/ignite/internal/util/lang/GridFunc.java | 7 +
.../managers/communication/ErrorMessage.java | 105 ++++++++++
.../communication/GridIoMessageFactory.java | 12 +-
.../distributed/GridDistributedLockResponse.java | 141 ++++---------
.../cache/distributed/dht/GridDhtLockFuture.java | 2 +-
.../cache/distributed/dht/GridDhtLockResponse.java | 121 +++--------
.../distributed/near/GridNearLockResponse.java | 232 ++++++++-------------
.../internal/codegen/MessageProcessorTest.java | 9 +
.../communication/ErrorMessageSelfTest.java | 68 ++++++
.../ignite/testsuites/IgniteBasicTestSuite.java | 4 +-
.../test/resources/codegen/ExceptionMessage.java | 38 ++++
12 files changed, 397 insertions(+), 345 deletions(-)
diff --git
a/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
b/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
index 3ebf72412d0..ae05a19e5e6 100644
---
a/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
+++
b/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
@@ -239,6 +239,9 @@ class MessageSerializerGenerator {
* @param opt Case option.
*/
private void processField(VariableElement field, int opt) throws Exception
{
+ if (assignableFrom(field.asType(), type(Throwable.class.getName())))
+ throw new UnsupportedOperationException("You should use
ErrorMessage for serialization of throwables.");
+
writeField(field, opt);
readField(field, opt);
}
diff --git
a/modules/commons/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
b/modules/commons/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index d1a1c408591..3af938392a4 100755
---
a/modules/commons/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++
b/modules/commons/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -2139,4 +2139,11 @@ public class GridFunc {
return Integer.compare(a1.length, a2.length);
}
+
+ /**
+ * @param col Collection.
+ */
+ public static <T> Collection<T> emptyIfNull(@Nullable Collection<T> col) {
+ return col == null ? Collections.emptySet() : col;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ErrorMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ErrorMessage.java
new file mode 100644
index 00000000000..71787abccec
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ErrorMessage.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.MessageProcessor;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.marshaller.Marshallers.jdk;
+
+/**
+ * Message used to transfer {@link Throwable} objects.
+ * <p>Because raw serialization of throwables is prohibited, you should use
this message when it is necessary
+ * to transfer some error as part of some message. See {@link
MessageProcessor} for details.
+ * <p>Currently, under the hood marshalling and unmarshalling is performed by
{@link JdkMarshaller}.
+ */
+@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+public class ErrorMessage implements Message {
+ /** Serialized form of throwable. */
+ @Order(value = 0, method = "errorBytes")
+ private byte @Nullable [] errBytes;
+
+ /** Original error. It is transient and necessary only to avoid duplicated
serialization and deserializtion. */
+ private @Nullable Throwable err;
+
+ /**
+ * Default constructor.
+ */
+ public ErrorMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param err Original error. Will be lazily serialized.
+ */
+ public ErrorMessage(@Nullable Throwable err) {
+ this.err = err;
+ }
+
+ /**
+ * @return Serialized form of throwable.
+ */
+ public byte @Nullable [] errorBytes() {
+ try {
+ if (errBytes == null && err != null)
+ errBytes = U.marshal(jdk(), err);
+
+ return errBytes;
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
+ * @param errBytes New serialized form of throwable.
+ */
+ public void errorBytes(byte @Nullable [] errBytes) {
+ this.errBytes = errBytes;
+ }
+
+ /**
+ * @return Original {@link Throwable}.
+ */
+ public @Nullable Throwable toThrowable() {
+ try {
+ if (err == null && errBytes != null) {
+ err = U.unmarshal(jdk(), errBytes, U.gridClassLoader());
+
+ // It is not necessary now.
+ errBytes = null;
+ }
+
+ return err;
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return -100;
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 48cbc18fece..d2fd67ae44c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -31,6 +31,7 @@ import
org.apache.ignite.internal.codegen.CacheEvictionEntrySerializer;
import org.apache.ignite.internal.codegen.CacheGroupAffinityMessageSerializer;
import
org.apache.ignite.internal.codegen.CachePartitionPartialCountersMapSerializer;
import org.apache.ignite.internal.codegen.CacheVersionedValueSerializer;
+import org.apache.ignite.internal.codegen.ErrorMessageSerializer;
import
org.apache.ignite.internal.codegen.GenerateEncryptionKeyRequestSerializer;
import org.apache.ignite.internal.codegen.GridCacheEntryInfoSerializer;
import org.apache.ignite.internal.codegen.GridCacheSqlQuerySerializer;
@@ -44,6 +45,7 @@ import
org.apache.ignite.internal.codegen.GridDeploymentResponseSerializer;
import
org.apache.ignite.internal.codegen.GridDhtAffinityAssignmentRequestSerializer;
import org.apache.ignite.internal.codegen.GridDhtAtomicNearResponseSerializer;
import org.apache.ignite.internal.codegen.GridDhtForceKeysRequestSerializer;
+import org.apache.ignite.internal.codegen.GridDhtLockResponseSerializer;
import
org.apache.ignite.internal.codegen.GridDhtPartitionDemandMessageSerializer;
import org.apache.ignite.internal.codegen.GridDhtPartitionExchangeIdSerializer;
import
org.apache.ignite.internal.codegen.GridDhtPartitionSupplyMessageSerializer;
@@ -51,12 +53,14 @@ import
org.apache.ignite.internal.codegen.GridDhtPartitionsSingleRequestSerializ
import
org.apache.ignite.internal.codegen.GridDhtTxOnePhaseCommitAckRequestSerializer;
import org.apache.ignite.internal.codegen.GridDhtTxPrepareRequestSerializer;
import org.apache.ignite.internal.codegen.GridDhtUnlockRequestSerializer;
+import
org.apache.ignite.internal.codegen.GridDistributedLockResponseSerializer;
import
org.apache.ignite.internal.codegen.GridDistributedTxPrepareRequestSerializer;
import org.apache.ignite.internal.codegen.GridJobCancelRequestSerializer;
import org.apache.ignite.internal.codegen.GridJobSiblingsRequestSerializer;
import
org.apache.ignite.internal.codegen.GridNearAtomicCheckUpdateRequestSerializer;
import
org.apache.ignite.internal.codegen.GridNearAtomicUpdateResponseSerializer;
import org.apache.ignite.internal.codegen.GridNearGetRequestSerializer;
+import org.apache.ignite.internal.codegen.GridNearLockResponseSerializer;
import org.apache.ignite.internal.codegen.GridNearTxPrepareRequestSerializer;
import org.apache.ignite.internal.codegen.GridNearUnlockRequestSerializer;
import org.apache.ignite.internal.codegen.GridQueryCancelRequestSerializer;
@@ -234,7 +238,7 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
/** {@inheritDoc} */
@Override public void registerAll(MessageFactory factory) {
// -54 is reserved for SQL.
- // -46 ... -51 - snapshot messages.
+ factory.register((short)-100, ErrorMessage::new, new
ErrorMessageSerializer());
factory.register((short)-61, IgniteDiagnosticMessage::new);
factory.register((short)-53, SchemaOperationStatusMessage::new, new
SchemaOperationStatusMessageSerializer());
factory.register((short)-51, NearCacheUpdates::new, new
NearCacheUpdatesSerializer());
@@ -271,7 +275,7 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)17, GridCacheTxRecoveryResponse::new, new
GridCacheTxRecoveryResponseSerializer());
factory.register((short)20, GridCacheTtlUpdateRequest::new, new
GridCacheTtlUpdateRequestSerializer());
factory.register((short)21, GridDistributedLockRequest::new);
- factory.register((short)22, GridDistributedLockResponse::new);
+ factory.register((short)22, GridDistributedLockResponse::new, new
GridDistributedLockResponseSerializer());
factory.register((short)23, GridDistributedTxFinishRequest::new);
factory.register((short)24, GridDistributedTxFinishResponse::new);
factory.register((short)25, GridDistributedTxPrepareRequest::new, new
GridDistributedTxPrepareRequestSerializer());
@@ -280,7 +284,7 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)28, GridDhtAffinityAssignmentRequest::new, new
GridDhtAffinityAssignmentRequestSerializer());
factory.register((short)29, GridDhtAffinityAssignmentResponse::new);
factory.register((short)30, GridDhtLockRequest::new);
- factory.register((short)31, GridDhtLockResponse::new);
+ factory.register((short)31, GridDhtLockResponse::new, new
GridDhtLockResponseSerializer());
factory.register((short)32, GridDhtTxFinishRequest::new);
factory.register((short)33, GridDhtTxFinishResponse::new);
factory.register((short)34, GridDhtTxPrepareRequest::new, new
GridDhtTxPrepareRequestSerializer());
@@ -300,7 +304,7 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)49, GridNearGetRequest::new, new
GridNearGetRequestSerializer());
factory.register((short)50, GridNearGetResponse::new);
factory.register((short)51, GridNearLockRequest::new);
- factory.register((short)52, GridNearLockResponse::new);
+ factory.register((short)52, GridNearLockResponse::new, new
GridNearLockResponseSerializer());
factory.register((short)53, GridNearTxFinishRequest::new);
factory.register((short)54, GridNearTxFinishResponse::new);
factory.register((short)55, GridNearTxPrepareRequest::new, new
GridNearTxPrepareRequestSerializer());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
index 19b9a9bdea5..8e01bb710c0 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
@@ -17,43 +17,37 @@
package org.apache.ignite.internal.processors.cache.distributed;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.communication.ErrorMessage;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
-import
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
/**
* Lock response message.
*/
+@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
public class GridDistributedLockResponse extends GridDistributedBaseMessage {
/** Future ID. */
+ @Order(value = 7, method = "futureId")
private IgniteUuid futId;
/** Error. */
- @GridDirectTransient
- private Throwable err;
-
- /** Serialized error. */
- private byte[] errBytes;
+ @Order(value = 8, method = "errorMessage")
+ private ErrorMessage errMsg;
/** Values. */
@GridToStringInclude
- @GridDirectCollection(CacheObject.class)
+ @Order(value = 9, method = "values")
private List<CacheObject> vals;
/**
@@ -103,7 +97,7 @@ public class GridDistributedLockResponse extends
GridDistributedBaseMessage {
this.cacheId = cacheId;
this.futId = futId;
- this.err = err;
+ errMsg = new ErrorMessage(err);
}
/**
@@ -126,7 +120,7 @@ public class GridDistributedLockResponse extends
GridDistributedBaseMessage {
this.cacheId = cacheId;
this.futId = futId;
- this.err = err;
+ errMsg = new ErrorMessage(err);
vals = new ArrayList<>(cnt);
}
@@ -139,16 +133,44 @@ public class GridDistributedLockResponse extends
GridDistributedBaseMessage {
return futId;
}
+ /**
+ * @param futId New future ID.
+ */
+ public void futureId(IgniteUuid futId) {
+ this.futId = futId;
+ }
+
/** {@inheritDoc} */
@Override public Throwable error() {
- return err;
+ return errMsg != null ? errMsg.toThrowable() : null;
}
/**
- * @param err Error to set.
+ * @return Error message.
*/
- public void error(Throwable err) {
- this.err = err;
+ public ErrorMessage errorMessage() {
+ return errMsg;
+ }
+
+ /**
+ * @param errMsg New error message.
+ */
+ public void errorMessage(ErrorMessage errMsg) {
+ this.errMsg = errMsg;
+ }
+
+ /**
+ * @return Values.
+ */
+ public List<CacheObject> values() {
+ return vals;
+ }
+
+ /**
+ * @param vals New values.
+ */
+ public void values(List<CacheObject> vals) {
+ this.vals = vals;
}
/**
@@ -187,9 +209,6 @@ public class GridDistributedLockResponse extends
GridDistributedBaseMessage {
super.prepareMarshal(ctx);
prepareMarshalCacheObjects(vals, ctx.cacheContext(cacheId));
-
- if (err != null && errBytes == null)
- errBytes = U.marshal(ctx.marshaller(), err);
}
/** {@inheritDoc} */
@@ -197,84 +216,6 @@ public class GridDistributedLockResponse extends
GridDistributedBaseMessage {
super.finishUnmarshal(ctx, ldr);
finishUnmarshalCacheObjects(vals, ctx.cacheContext(cacheId), ldr);
-
- if (errBytes != null)
- err = U.unmarshal(ctx, errBytes, U.resolveClassLoader(ldr,
ctx.gridConfig()));
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 7:
- if (!writer.writeByteArray(errBytes))
- return false;
-
- writer.incrementState();
-
- case 8:
- if (!writer.writeIgniteUuid(futId))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeCollection(vals,
MessageCollectionItemType.CACHE_OBJECT))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 7:
- errBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 8:
- futId = reader.readIgniteUuid();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- vals =
reader.readCollection(MessageCollectionItemType.CACHE_OBJECT);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index ff2f104386f..d139f6523cb 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -1339,7 +1339,7 @@ public final class GridDhtLockFuture extends
GridCacheCompoundIdentityFuture<Boo
if (checkDone())
return;
- for (GridCacheEntryInfo info : res.preloadEntries()) {
+ for (GridCacheEntryInfo info :
F.emptyIfNull(res.preloadEntries())) {
try {
GridCacheEntryEx entry =
cache0.entryEx(info.key(), topVer);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java
index 622e222cc27..30e7ad57541 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java
@@ -17,14 +17,12 @@
package org.apache.ignite.internal.processors.cache.distributed.dht;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -33,24 +31,23 @@ import
org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
-import
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
/**
* DHT cache lock response.
*/
+@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
public class GridDhtLockResponse extends GridDistributedLockResponse {
- /** Mini ID. */
+ /** Mini future ID. */
+ @Order(10)
private IgniteUuid miniId;
/** Invalid partitions. */
@GridToStringInclude
- @GridDirectCollection(int.class)
+ @Order(value = 11, method = "invalidPartitions")
private Collection<Integer> invalidParts;
- /** Preload entries. */
- @GridDirectCollection(GridCacheEntryInfo.class)
+ /** Preload entries returned from backup. */
+ @Order(12)
private List<GridCacheEntryInfo> preloadEntries;
/**
@@ -99,6 +96,13 @@ public class GridDhtLockResponse extends
GridDistributedLockResponse {
return miniId;
}
+ /**
+ * @param miniId New mini future ID.
+ */
+ public void miniId(IgniteUuid miniId) {
+ this.miniId = miniId;
+ }
+
/**
* @param part Invalid partition.
*/
@@ -113,7 +117,14 @@ public class GridDhtLockResponse extends
GridDistributedLockResponse {
* @return Invalid partitions.
*/
public Collection<Integer> invalidPartitions() {
- return invalidParts == null ? Collections.emptySet() : invalidParts;
+ return invalidParts;
+ }
+
+ /**
+ * @param invalidParts New invalid partitions.
+ */
+ public void invalidPartitions(Collection<Integer> invalidParts) {
+ this.invalidParts = invalidParts;
}
/**
@@ -129,12 +140,17 @@ public class GridDhtLockResponse extends
GridDistributedLockResponse {
}
/**
- * Gets preload entries returned from backup.
- *
- * @return Collection of preload entries.
+ * @return Preload entries returned from backup.
*/
public Collection<GridCacheEntryInfo> preloadEntries() {
- return preloadEntries == null ? Collections.emptyList() :
preloadEntries;
+ return preloadEntries;
+ }
+
+ /**
+ * @param preloadEntries New preload entries returned from backup.
+ */
+ public void preloadEntries(List<GridCacheEntryInfo> preloadEntries) {
+ this.preloadEntries = preloadEntries;
}
/** {@inheritDoc} */
@@ -155,81 +171,6 @@ public class GridDhtLockResponse extends
GridDistributedLockResponse {
unmarshalInfos(preloadEntries, ctx.cacheContext(cacheId), ldr);
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 10:
- if (!writer.writeCollection(invalidParts,
MessageCollectionItemType.INT))
- return false;
-
- writer.incrementState();
-
- case 11:
- if (!writer.writeIgniteUuid(miniId))
- return false;
-
- writer.incrementState();
-
- case 12:
- if (!writer.writeCollection(preloadEntries,
MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 10:
- invalidParts =
reader.readCollection(MessageCollectionItemType.INT);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 11:
- miniId = reader.readIgniteUuid();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 12:
- preloadEntries =
reader.readCollection(MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
/** {@inheritDoc} */
@Override public short directType() {
return 31;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
index d74ce684415..a8e6173a6aa 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
@@ -17,10 +17,9 @@
package org.apache.ignite.internal.processors.cache.distributed.near;
-import java.nio.ByteBuffer;
import java.util.Collection;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import
org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockResponse;
@@ -28,38 +27,45 @@ import
org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
-import
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
/**
* Near cache lock response.
*/
+@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
public class GridNearLockResponse extends GridDistributedLockResponse {
- /** Collection of versions that are pending and less than lock version. */
+ /** Pending versions that are less than {@link #version()}. */
@GridToStringInclude
- @GridDirectCollection(GridCacheVersion.class)
+ @Order(10)
private Collection<GridCacheVersion> pending;
- /** */
+ /** Mini future ID. */
+ @Order(11)
private int miniId;
/** DHT versions. */
@GridToStringInclude
+ @Order(value = 12, method = "dhtVersions")
private GridCacheVersion[] dhtVers;
/** DHT candidate versions. */
@GridToStringInclude
+ @Order(value = 13, method = "mappedVersions")
private GridCacheVersion[] mappedVers;
/** Filter evaluation results for fast-commit transactions. */
+ @Order(value = 14, method = "filterResults")
private boolean[] filterRes;
- /** Set if client node should remap lock request. */
+ /** Topology version, which is set when client node should remap lock
request. */
+ @Order(value = 15, method = "clientRemapVersion")
private AffinityTopologyVersion clientRemapVer;
- /** {@code True} if remap version is compatible with current version. Used
together with clientRemapVer. */
+ /**
+ * Flag, indicating whether remap version is compatible with current
version.
+ * Used together with clientRemapVer.
+ */
+ @Order(value = 16, method = "compatibleRemapVersion")
private boolean compatibleRemapVer;
/**
@@ -111,32 +117,42 @@ public class GridNearLockResponse extends
GridDistributedLockResponse {
}
/**
- * @return {@code True} if client node should remap lock request.
+ * @return Topology version, which is set when client node should remap
lock request.
*/
@Nullable public AffinityTopologyVersion clientRemapVersion() {
return clientRemapVer;
}
/**
- * @return {@code True} is remap version is compatible with current
topology version.
+ * @param clientRemapVer New topology version, which is set when client
node should remap lock request.
+ */
+ public void clientRemapVersion(AffinityTopologyVersion clientRemapVer) {
+ this.clientRemapVer = clientRemapVer;
+ }
+
+ /**
+ * @return Flag, indicating whether remap version is compatible with
current version.
*/
public boolean compatibleRemapVersion() {
return compatibleRemapVer;
}
/**
- * Gets pending versions that are less than {@link #version()}.
- *
- * @return Pending versions.
+ * @param compatibleRemapVer New flag, indicating whether remap version is
compatible with current version.
+ */
+ public void compatibleRemapVersion(boolean compatibleRemapVer) {
+ this.compatibleRemapVer = compatibleRemapVer;
+ }
+
+ /**
+ * @return Pending versions that are less than {@link #version()}.
*/
public Collection<GridCacheVersion> pending() {
return pending;
}
/**
- * Sets pending versions that are less than {@link #version()}.
- *
- * @param pending Pending versions.
+ * @param pending New pending versions that are less than {@link
#version()}.
*/
public void pending(Collection<GridCacheVersion> pending) {
this.pending = pending;
@@ -149,6 +165,13 @@ public class GridNearLockResponse extends
GridDistributedLockResponse {
return miniId;
}
+ /**
+ * @param miniId New mini future ID.
+ */
+ public void miniId(int miniId) {
+ this.miniId = miniId;
+ }
+
/**
* @param idx Index.
* @return DHT version.
@@ -157,6 +180,20 @@ public class GridNearLockResponse extends
GridDistributedLockResponse {
return dhtVers == null ? null : dhtVers[idx];
}
+ /**
+ * @return DHT versions.
+ */
+ public GridCacheVersion[] dhtVersions() {
+ return dhtVers;
+ }
+
+ /**
+ * @param dhtVers New DHT versions.
+ */
+ public void dhtVersions(GridCacheVersion[] dhtVers) {
+ this.dhtVers = dhtVers;
+ }
+
/**
* Returns DHT candidate version for acquired near lock on DHT node.
*
@@ -167,6 +204,20 @@ public class GridNearLockResponse extends
GridDistributedLockResponse {
return mappedVers == null ? null : mappedVers[idx];
}
+ /**
+ * @return DHT candidate versions.
+ */
+ public GridCacheVersion[] mappedVersions() {
+ return mappedVers;
+ }
+
+ /**
+ * @param mappedVers New DHT candidate versions.
+ */
+ public void mappedVersions(GridCacheVersion[] mappedVers) {
+ this.mappedVers = mappedVers;
+ }
+
/**
* Gets filter evaluation result for fast-commit transaction.
*
@@ -179,6 +230,20 @@ public class GridNearLockResponse extends
GridDistributedLockResponse {
return filterRes[idx];
}
+ /**
+ * @return Filter evaluation results for fast-commit transactions.
+ */
+ public boolean[] filterResults() {
+ return filterRes;
+ }
+
+ /**
+ * @param filterRes New filter evaluation results for fast-commit
transactions.
+ */
+ public void filterResults(boolean[] filterRes) {
+ this.filterRes = filterRes;
+ }
+
/**
* @param val Value.
* @param filterPassed Boolean flag indicating whether filter passed for
fast-commit transaction.
@@ -204,137 +269,6 @@ public class GridNearLockResponse extends
GridDistributedLockResponse {
addValue(val);
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 10:
- if (!writer.writeAffinityTopologyVersion(clientRemapVer))
- return false;
-
- writer.incrementState();
-
- case 11:
- if (!writer.writeBoolean(compatibleRemapVer))
- return false;
-
- writer.incrementState();
-
- case 12:
- if (!writer.writeObjectArray(dhtVers,
MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 13:
- if (!writer.writeBooleanArray(filterRes))
- return false;
-
- writer.incrementState();
-
- case 14:
- if (!writer.writeObjectArray(mappedVers,
MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 15:
- if (!writer.writeInt(miniId))
- return false;
-
- writer.incrementState();
-
- case 16:
- if (!writer.writeCollection(pending,
MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 10:
- clientRemapVer = reader.readAffinityTopologyVersion();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 11:
- compatibleRemapVer = reader.readBoolean();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 12:
- dhtVers =
reader.readObjectArray(MessageCollectionItemType.MSG, GridCacheVersion.class);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 13:
- filterRes = reader.readBooleanArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 14:
- mappedVers =
reader.readObjectArray(MessageCollectionItemType.MSG, GridCacheVersion.class);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 15:
- miniId = reader.readInt();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 16:
- pending = reader.readCollection(MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
/** {@inheritDoc} */
@Override public short directType() {
return 52;
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/codegen/MessageProcessorTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/codegen/MessageProcessorTest.java
index f83be0324b1..cb343e5ddab 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/codegen/MessageProcessorTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/codegen/MessageProcessorTest.java
@@ -161,6 +161,15 @@ public class MessageProcessorTest {
assertThat(compilation).failed();
}
+ /** */
+ @Test
+ public void testExceptionFailed() {
+ Compilation compilation = compile("ExceptionMessage.java");
+
+ assertThat(compilation).failed();
+ assertThat(compilation).hadErrorContaining("You should use
ErrorMessage for serialization of throwables.");
+ }
+
/** */
private Compilation compile(String... srcFiles) {
List<JavaFileObject> input = new ArrayList<>();
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/ErrorMessageSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/ErrorMessageSelfTest.java
new file mode 100644
index 00000000000..2de636d6088
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/ErrorMessageSelfTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.util.typedef.X;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+/** Test for {@link ErrorMessage}. */
+public class ErrorMessageSelfTest {
+ /** */
+ @Test
+ public void testDirectAndInsverseConversion() {
+ IgniteException e = new IgniteException("Test exception", new
IgniteCheckedException("Test cause"));
+
+ ErrorMessage msg0 = new ErrorMessage(e);
+
+ assertSame(e, msg0.toThrowable());
+
+ byte[] errBytes = msg0.errorBytes();
+
+ assertNotNull(errBytes);
+
+ ErrorMessage msg1 = new ErrorMessage();
+ msg1.errorBytes(errBytes);
+
+ Throwable t = msg1.toThrowable();
+
+ assertNotNull(t);
+ assertTrue(X.hasCause(t, "Test exception", IgniteException.class));
+ assertTrue(X.hasCause(t, "Test cause", IgniteCheckedException.class));
+ }
+
+ /** */
+ @Test
+ public void testNull() {
+ assertNull(new ErrorMessage(null).toThrowable());
+ assertNull(new ErrorMessage(null).errorBytes());
+
+ ErrorMessage msg = new ErrorMessage();
+
+ msg.errorBytes(null);
+
+ assertNull(msg.toThrowable());
+ assertNull(msg.errorBytes());
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index a2a7ee30dd2..b561782147e 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -37,6 +37,7 @@ import
org.apache.ignite.internal.IgniteLocalNodeMapBeforeStartTest;
import org.apache.ignite.internal.IgniteSlowClientDetectionSelfTest;
import org.apache.ignite.internal.TransactionsMXBeanImplTest;
import org.apache.ignite.internal.codegen.MessageProcessorTest;
+import org.apache.ignite.internal.managers.communication.ErrorMessageSelfTest;
import
org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentV2Test;
import
org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentV2TestNoOptimizations;
import
org.apache.ignite.internal.processors.affinity.GridAffinityProcessorRendezvousSelfTest;
@@ -143,7 +144,8 @@ import org.junit.runners.Suite;
JavaVersionCommandParserTest.class,
ClientSessionOutboundQueueLimitTest.class,
- MessageProcessorTest.class
+ MessageProcessorTest.class,
+ ErrorMessageSelfTest.class,
})
public class IgniteBasicTestSuite {
}
diff --git a/modules/core/src/test/resources/codegen/ExceptionMessage.java
b/modules/core/src/test/resources/codegen/ExceptionMessage.java
new file mode 100644
index 00000000000..a7a2983e4b5
--- /dev/null
+++ b/modules/core/src/test/resources/codegen/ExceptionMessage.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+public class ExceptionMessage implements Message {
+ @Order(0)
+ private IgniteException error;
+
+ public IgniteException error() {
+ return error;
+ }
+
+ public void error(IgniteException error) {
+ this.error = error;
+ }
+
+ public short directType() {
+ return 0;
+ }
+}