This is an automated email from the ASF dual-hosted git repository.
shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new fa4095d6800 IGNITE-28594 Use Message DTO for
DiscoveryDataBag#JoiningNodeDiscoveryData (#13067)
fa4095d6800 is described below
commit fa4095d6800971bc5549073f609606aceebb3454
Author: Ilya Shishkov <[email protected]>
AuthorDate: Mon May 18 16:50:02 2026 +0300
IGNITE-28594 Use Message DTO for DiscoveryDataBag#JoiningNodeDiscoveryData
(#13067)
---
.../ignite/internal/CoreMessagesProvider.java | 19 ++++-
.../ignite/internal/GridPluginComponent.java | 2 +-
.../managers/encryption/GridEncryptionManager.java | 45 +----------
.../managers/encryption/NodeEncryptionKeys.java | 69 ++++++++++++++++
.../cache/ValidationOnNodeJoinUtils.java | 2 +-
.../cache/binary/BinaryMetadataTransport.java | 20 +----
.../cache/binary/BinaryMetadataVersionInfo.java | 31 +++-----
.../cache/binary/BinaryMetadataVersionsData.java | 39 +++++++++
.../binary/CacheObjectBinaryProcessorImpl.java | 17 ++--
.../cluster/GridClusterStateProcessor.java | 37 +--------
.../continuous/GridContinuousProcessor.java | 5 +-
.../marshaller/GridMarshallerMappingProcessor.java | 6 +-
.../internal/processors/marshaller/MappedName.java | 13 ++-
.../marshaller/MarshallerMappingsData.java | 40 ++++++++++
.../persistence/DistributedMetaStorageImpl.java | 53 ++----------
.../processors/plugin/IgnitePluginProcessor.java | 2 +-
.../processors/query/GridQueryProcessor.java | 21 ++---
.../internal/processors/query/InlineSizesData.java | 39 +++++++++
.../processors/service/IgniteServiceProcessor.java | 18 ++++-
.../ignite/spi/discovery/DiscoveryDataBag.java | 30 +++++--
.../apache/ignite/spi/discovery/ObjectData.java | 82 +++++++++++++++++++
.../ignite/spi/discovery/tcp/ServerImpl.java | 17 +---
.../ignite/spi/discovery/tcp/TcpDiscoverySpi.java | 8 +-
.../tcp/internal/DiscoveryDataPacket.java | 93 ++++------------------
.../tcp/messages/TcpDiscoveryNodeAddedMessage.java | 9 ---
.../main/resources/META-INF/classnames.properties | 2 +-
.../DistributedMetaStoragePersistentTest.java | 11 +--
.../zk/internal/DiscoveryMessageParser.java | 13 ++-
.../discovery/zk/internal/ZkJoiningNodeData.java | 6 +-
.../zk/internal/ZookeeperDiscoveryImpl.java | 10 ++-
30 files changed, 418 insertions(+), 341 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
index f1f7883ea19..460cb7bcc10 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
@@ -41,6 +41,7 @@ import
org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyReque
import
org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyResponse;
import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted;
import org.apache.ignite.internal.managers.encryption.MasterKeyChangeRequest;
+import org.apache.ignite.internal.managers.encryption.NodeEncryptionKeys;
import
org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessage;
import
org.apache.ignite.internal.plugin.AbstractMarshallableMessageFactoryProvider;
import org.apache.ignite.internal.processors.authentication.User;
@@ -71,6 +72,7 @@ import
org.apache.ignite.internal.processors.cache.WalStateAckMessage;
import org.apache.ignite.internal.processors.cache.WalStateFinishMessage;
import org.apache.ignite.internal.processors.cache.WalStateProposeMessage;
import
org.apache.ignite.internal.processors.cache.binary.BinaryMetadataVersionInfo;
+import
org.apache.ignite.internal.processors.cache.binary.BinaryMetadataVersionsData;
import
org.apache.ignite.internal.processors.cache.binary.MetadataRemoveAcceptedMessage;
import
org.apache.ignite.internal.processors.cache.binary.MetadataRemoveProposedMessage;
import
org.apache.ignite.internal.processors.cache.binary.MetadataRequestMessage;
@@ -193,15 +195,18 @@ import
org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMess
import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerResponse;
+import org.apache.ignite.internal.processors.marshaller.MappedName;
import org.apache.ignite.internal.processors.marshaller.MappingAcceptedMessage;
import org.apache.ignite.internal.processors.marshaller.MappingProposedMessage;
import org.apache.ignite.internal.processors.marshaller.MarshallerMappingItem;
+import org.apache.ignite.internal.processors.marshaller.MarshallerMappingsData;
import
org.apache.ignite.internal.processors.marshaller.MissingMappingRequestMessage;
import
org.apache.ignite.internal.processors.marshaller.MissingMappingResponseMessage;
import
org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageCasAckMessage;
import
org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageCasMessage;
import
org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateAckMessage;
import
org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateMessage;
+import org.apache.ignite.internal.processors.query.InlineSizesData;
import org.apache.ignite.internal.processors.query.QueryField;
import
org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
import
org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
@@ -250,6 +255,7 @@ import
org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage;
import org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage;
import org.apache.ignite.spi.communication.tcp.messages.NodeIdMessage;
import
org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage;
+import org.apache.ignite.spi.discovery.ObjectData;
import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage;
@@ -351,6 +357,7 @@ public class CoreMessagesProvider extends
AbstractMarshallableMessageFactoryProv
withNoSchema(GridCacheVersion.class);
withNoSchema(GridCacheVersionEx.class);
withNoSchema(WALPointer.class);
+ withNoSchemaResolvedClassLoader(ObjectData.class);
// [5700 - 5900]: Discovery originated messages.
msgIdx = 5700;
@@ -572,6 +579,7 @@ public class CoreMessagesProvider extends
AbstractMarshallableMessageFactoryProv
withNoSchema(StatisticsResponse.class);
withNoSchema(CacheContinuousQueryBatchAck.class);
withSchema(CacheContinuousQueryEntry.class);
+ withNoSchema(InlineSizesData.class);
// [11200 - 11300]: Compute, distributed process messages.
msgIdx = 11200;
@@ -636,7 +644,10 @@ public class CoreMessagesProvider extends
AbstractMarshallableMessageFactoryProv
withNoSchema(MetadataRequestMessage.class);
withNoSchema(MetadataResponseMessage.class);
withNoSchema(MarshallerMappingItem.class);
- withNoSchema(BinaryMetadataVersionInfo.class);
+ withSchemaResolvedClassLoader(BinaryMetadataVersionInfo.class);
+ withNoSchema(BinaryMetadataVersionsData.class);
+ withNoSchema(MappedName.class);
+ withNoSchema(MarshallerMappingsData.class);
// [12400 - 12500]: Encryption messages.
msgIdx = 12400;
@@ -645,6 +656,7 @@ public class CoreMessagesProvider extends
AbstractMarshallableMessageFactoryProv
withNoSchema(ChangeCacheEncryptionRequest.class);
withNoSchema(MasterKeyChangeRequest.class);
withNoSchema(GroupKeyEncrypted.class);
+ withNoSchema(NodeEncryptionKeys.class);
// [13000 - 13300]: Control, configuration, diagnostincs and other
messages.
msgIdx = 13000;
@@ -677,6 +689,11 @@ public class CoreMessagesProvider extends
AbstractMarshallableMessageFactoryProv
register(cls, dfltMarsh, resolvedClsLdr);
}
+ /** Registers message using {@link #schemaAwareMarsh} and {@link
#resolvedClsLdr}. */
+ private <T extends Message> void withSchemaResolvedClassLoader(Class<T>
cls) {
+ register(cls, schemaAwareMarsh, resolvedClsLdr);
+ }
+
/** Registers message using incrementing {@link #msgIdx} as the message
id/type. */
private <T extends Message> void register(Class<T> cls, Marshaller marsh,
ClassLoader clsLrd) {
register(factory, cls, msgIdx++, marsh, clsLrd);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
index 56f292f407b..f0e1a7c7b62 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
@@ -115,7 +115,7 @@ public class GridPluginComponent implements GridComponent {
@Nullable @Override public IgniteNodeValidationResult
validateNode(ClusterNode node,
JoiningNodeDiscoveryData discoData) {
try {
- Map<String, Serializable> map = (Map<String,
Serializable>)discoData.joiningNodeData();
+ Map<String, Serializable> map = discoData.joiningNodeData();
if (map != null)
plugin.validateNewNode(node, map.get(plugin.name()));
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
index e20b1effd18..19c58d5d7b8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
@@ -431,9 +431,9 @@ public class GridEncryptionManager extends
GridManagerAdapter<EncryptionSpi> imp
"Cache group key change is in progress! Node join is
rejected.");
}
- NodeEncryptionKeys nodeEncKeys =
(NodeEncryptionKeys)discoData.joiningNodeData();
+ NodeEncryptionKeys nodeEncKeys = discoData.joiningNodeData();
- if (!discoData.hasJoiningNodeData() || nodeEncKeys == null) {
+ if (nodeEncKeys == null) {
return new IgniteNodeValidationResult(ctx.localNodeId(),
"Joining node doesn't have encryption data [node=" + node.id()
+ "]",
"Joining node doesn't have encryption data.");
@@ -522,7 +522,7 @@ public class GridEncryptionManager extends
GridManagerAdapter<EncryptionSpi> imp
/** {@inheritDoc} */
@Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData
data) {
- NodeEncryptionKeys nodeEncryptionKeys =
(NodeEncryptionKeys)data.joiningNodeData();
+ NodeEncryptionKeys nodeEncryptionKeys = data.joiningNodeData();
if (nodeEncryptionKeys == null || nodeEncryptionKeys.newKeys == null
|| ctx.clientNode())
return;
@@ -1748,45 +1748,6 @@ public class GridEncryptionManager extends
GridManagerAdapter<EncryptionSpi> imp
});
}
- /** */
- protected static class NodeEncryptionKeys implements Serializable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- NodeEncryptionKeys(
- HashMap<Integer, List<GroupKeyEncrypted>> knownKeysWithIds,
- Map<Integer, byte[]> newKeys,
- byte[] masterKeyDigest
- ) {
- this.newKeys = newKeys;
- this.masterKeyDigest = masterKeyDigest;
-
- if (F.isEmpty(knownKeysWithIds))
- return;
-
- // To be able to join the old cluster.
- knownKeys = U.newHashMap(knownKeysWithIds.size());
-
- for (Map.Entry<Integer, List<GroupKeyEncrypted>> entry :
knownKeysWithIds.entrySet())
- knownKeys.put(entry.getKey(), entry.getValue().get(0).key());
-
- this.knownKeysWithIds = knownKeysWithIds;
- }
-
- /** Known i.e. stored in {@code ReadWriteMetastorage} keys from node
(in compatible format). */
- Map<Integer, byte[]> knownKeys;
-
- /** New keys i.e. keys for a local statically configured caches. */
- Map<Integer, byte[]> newKeys;
-
- /** Master key digest. */
- byte[] masterKeyDigest;
-
- /** Known i.e. stored in {@code ReadWriteMetastorage} keys from node.
*/
- Map<Integer, List<GroupKeyEncrypted>> knownKeysWithIds;
- }
-
/** */
private class GenerateEncryptionKeyFuture extends
GridFutureAdapter<T2<Collection<byte[]>, byte[]>> {
/** */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/NodeEncryptionKeys.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/NodeEncryptionKeys.java
new file mode 100644
index 00000000000..c2583de3b04
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/NodeEncryptionKeys.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.encryption;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+/** */
+public class NodeEncryptionKeys implements Message {
+ /** Known i.e. stored in {@code ReadWriteMetastorage} keys from node (in
compatible format). */
+ @Order(0)
+ Map<Integer, byte[]> knownKeys;
+
+ /** New keys i.e. keys for a local statically configured caches. */
+ @Order(1)
+ Map<Integer, byte[]> newKeys;
+
+ /** Master key digest. */
+ @Order(2)
+ byte[] masterKeyDigest;
+
+ /** Known i.e. stored in {@code ReadWriteMetastorage} keys from node. */
+ @Order(3)
+ Map<Integer, List<GroupKeyEncrypted>> knownKeysWithIds;
+
+ /** */
+ public NodeEncryptionKeys() {}
+
+ /** */
+ NodeEncryptionKeys(
+ HashMap<Integer, List<GroupKeyEncrypted>> knownKeysWithIds,
+ Map<Integer, byte[]> newKeys,
+ byte[] masterKeyDigest
+ ) {
+ this.newKeys = newKeys;
+ this.masterKeyDigest = masterKeyDigest;
+
+ if (F.isEmpty(knownKeysWithIds))
+ return;
+
+ // To be able to join the old cluster.
+ knownKeys = U.newHashMap(knownKeysWithIds.size());
+
+ for (Map.Entry<Integer, List<GroupKeyEncrypted>> entry :
knownKeysWithIds.entrySet())
+ knownKeys.put(entry.getKey(), entry.getValue().get(0).key());
+
+ this.knownKeysWithIds = knownKeysWithIds;
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java
index 4f72dc259d8..ac8d97b2a12 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java
@@ -124,7 +124,7 @@ public class ValidationOnNodeJoinUtils {
Function<String, DynamicCacheDescriptor> cacheDescProvider
) {
if (discoData.hasJoiningNodeData() && discoData.joiningNodeData()
instanceof CacheJoinNodeDiscoveryData) {
- CacheJoinNodeDiscoveryData nodeData =
(CacheJoinNodeDiscoveryData)discoData.joiningNodeData();
+ CacheJoinNodeDiscoveryData nodeData = discoData.joiningNodeData();
boolean isGridActive = ctx.state().clusterState().active();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
index 0cfb2d96768..5b36542f1d6 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
@@ -841,15 +841,6 @@ final class BinaryMetadataTransport {
MetadataResponseMessage resp = new MetadataResponseMessage(typeId);
- if (metaVerInfo != null) {
- try {
- metaVerInfo.marshalMetadata();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to marshal binary metadata for
[typeId=" + typeId + ']', e);
- }
- }
-
resp.metadataVersionInfo(metaVerInfo);
try {
@@ -890,16 +881,9 @@ final class BinaryMetadataTransport {
return;
}
- try {
- metaVerInfo.unmarshalMetadata();
-
- casBinaryMetadata(typeId, metaVerInfo);
+ casBinaryMetadata(typeId, metaVerInfo);
- fut.onDone(MetadataUpdateResult.createSuccessfulResult(-1));
- }
- catch (IgniteCheckedException e) {
- fut.onDone(MetadataUpdateResult.createFailureResult(new
BinaryObjectException(e)));
- }
+ fut.onDone(MetadataUpdateResult.createSuccessfulResult(-1));
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataVersionInfo.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataVersionInfo.java
index 1c71892fd17..9fe6d19da44 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataVersionInfo.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataVersionInfo.java
@@ -18,12 +18,11 @@ package org.apache.ignite.internal.processors.cache.binary;
import java.io.Serializable;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.MarshallableMessage;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.binary.BinaryMetadata;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.plugin.extensions.communication.Message;
-
-import static org.apache.ignite.marshaller.Marshallers.jdk;
+import org.apache.ignite.marshaller.Marshaller;
/**
* Wrapper for {@link BinaryMetadata} which is stored in metadata local cache
on each node.
@@ -31,7 +30,7 @@ import static org.apache.ignite.marshaller.Marshallers.jdk;
* The version refers solely to the internal protocol for updating
BinaryMetadata and is unknown externally.
* It can be updated dynamically from different nodes and threads on the same
node.
*/
-public final class BinaryMetadataVersionInfo implements Serializable, Message {
+public final class BinaryMetadataVersionInfo implements Serializable,
MarshallableMessage {
/** */
private static final long serialVersionUID = 0L;
@@ -130,24 +129,16 @@ public final class BinaryMetadataVersionInfo implements
Serializable, Message {
return removing;
}
- /**
- * Marshals binary metadata to byte array.
- *
- * @throws IgniteCheckedException If failed.
- */
- public void marshalMetadata() throws IgniteCheckedException {
- if (metadataBytes == null)
- metadataBytes = U.marshal(jdk(), metadata);
+ /** {@inheritDoc} */
+ @Override public void prepareMarshal(Marshaller marsh) throws
IgniteCheckedException {
+ if (metadata != null)
+ metadataBytes = U.marshal(marsh, metadata);
}
- /**
- * Unmarshals binary metadata from byte array.
- *
- * @throws IgniteCheckedException If failed.
- */
- public void unmarshalMetadata() throws IgniteCheckedException {
- if (metadata == null && metadataBytes != null) {
- metadata = U.unmarshal(jdk(), metadataBytes, U.gridClassLoader());
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(Marshaller marsh, ClassLoader
clsLdr) throws IgniteCheckedException {
+ if (metadataBytes != null) {
+ metadata = U.unmarshal(marsh, metadataBytes, clsLdr);
// It is not required anymore.
metadataBytes = null;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataVersionsData.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataVersionsData.java
new file mode 100644
index 00000000000..706d2c26f1c
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataVersionsData.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.binary;
+
+import java.util.Map;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+/** */
+public class BinaryMetadataVersionsData implements Message {
+ /** */
+ @Order(0)
+ Map<Integer, BinaryMetadataVersionInfo> data;
+
+ /** */
+ public BinaryMetadataVersionsData() {}
+
+ /**
+ * @param data Data.
+ */
+ public BinaryMetadataVersionsData(Map<Integer, BinaryMetadataVersionInfo>
data) {
+ this.data = Map.copyOf(data);
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index a0682d41b41..8867e576ec8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -1394,7 +1394,7 @@ public class CacheObjectBinaryProcessorImpl extends
GridProcessorAdapter impleme
if ((res = validateBinaryConfiguration(rmtNode)) != null)
return res;
- return validateBinaryMetadata(rmtNode.id(), (Map<Integer,
BinaryMetadataVersionInfo>)discoData.joiningNodeData());
+ return validateBinaryMetadata(rmtNode.id(),
discoData.joiningNodeData());
}
/** */
@@ -1418,11 +1418,11 @@ public class CacheObjectBinaryProcessorImpl extends
GridProcessorAdapter impleme
}
/** */
- private IgniteNodeValidationResult validateBinaryMetadata(UUID rmtNodeId,
Map<Integer, BinaryMetadataVersionInfo> newNodeMeta) {
+ private IgniteNodeValidationResult validateBinaryMetadata(UUID rmtNodeId,
BinaryMetadataVersionsData newNodeMeta) {
if (newNodeMeta == null)
return null;
- for (Map.Entry<Integer, BinaryMetadataVersionInfo> metaEntry :
newNodeMeta.entrySet()) {
+ for (Map.Entry<Integer, BinaryMetadataVersionInfo> metaEntry :
newNodeMeta.data.entrySet()) {
if (!metadataLocCache.containsKey(metaEntry.getKey()))
continue;
@@ -1470,24 +1470,19 @@ public class CacheObjectBinaryProcessorImpl extends
GridProcessorAdapter impleme
/** {@inheritDoc} */
@Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
- Map<Integer, BinaryMetadataVersionInfo> res =
U.newHashMap(metadataLocCache.size());
-
- for (Map.Entry<Integer, BinaryMetadataVersionInfo> e :
metadataLocCache.entrySet())
- res.put(e.getKey(), e.getValue());
-
- dataBag.addJoiningNodeData(BINARY_PROC.ordinal(), (Serializable)res);
+ dataBag.addJoiningNodeData(BINARY_PROC.ordinal(), new
BinaryMetadataVersionsData(metadataLocCache));
}
/** {@inheritDoc} */
@Override public void
onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) {
- Map<Integer, BinaryMetadataVersionInfo> newNodeMeta = (Map<Integer,
BinaryMetadataVersionInfo>)data.joiningNodeData();
+ BinaryMetadataVersionsData newNodeMeta = data.joiningNodeData();
if (newNodeMeta == null)
return;
UUID joiningNode = data.joiningNodeId();
- for (Map.Entry<Integer, BinaryMetadataVersionInfo> metaEntry :
newNodeMeta.entrySet()) {
+ for (Map.Entry<Integer, BinaryMetadataVersionInfo> metaEntry :
newNodeMeta.data.entrySet()) {
if (metadataLocCache.containsKey(metaEntry.getKey())) {
BinaryMetadataVersionInfo locMetaVerInfo =
metadataLocCache.get(metaEntry.getKey());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index 4640faf6d8e..337b4f51a70 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -925,14 +925,7 @@ public class GridClusterStateProcessor extends
GridProcessorAdapter implements I
/** {@inheritDoc} */
@Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
- try {
- byte[] marshalledState = marsh.marshal(globalState);
-
- dataBag.addJoiningNodeData(discoveryDataType().ordinal(),
marshalledState);
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
+ dataBag.addJoiningNodeData(discoveryDataType().ordinal(), globalState);
}
/** {@inheritDoc} */
@@ -953,20 +946,7 @@ public class GridClusterStateProcessor extends
GridProcessorAdapter implements I
return;
}
- DiscoveryDataClusterState joiningNodeState = null;
-
- try {
- if (joiningNodeData.joiningNodeData() != null)
- joiningNodeState = marsh.unmarshal(
- (byte[])joiningNodeData.joiningNodeData(),
- U.resolveClassLoader(ctx.config())
- );
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to unmarshal disco data from joining node: "
+ joiningNodeData.joiningNodeId());
-
- return;
- }
+ DiscoveryDataClusterState joiningNodeState =
joiningNodeData.joiningNodeData();
BaselineTopologyHistory historyToSend = null;
@@ -1251,18 +1231,7 @@ public class GridClusterStateProcessor extends
GridProcessorAdapter implements I
return null;
}
- DiscoveryDataClusterState joiningNodeState;
-
- try {
- joiningNodeState =
marsh.unmarshal((byte[])discoData.joiningNodeData(),
U.resolveClassLoader(ctx.config()));
- }
- catch (IgniteCheckedException e) {
- String msg = "Error on unmarshalling discovery data " +
- "from node " + node.consistentId() + ": " + e.getMessage() +
- "; node is not allowed to join";
-
- return new IgniteNodeValidationResult(node.id(), msg);
- }
+ DiscoveryDataClusterState joiningNodeState =
discoData.joiningNodeData();
if (joiningNodeState == null || joiningNodeState.baselineTopology() ==
null)
return null;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 7509a88fa17..30bc8175de3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -521,8 +521,7 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
if (immutableDiscoCustomMsg) {
if (data.hasJoiningNodeData()) {
- ContinuousRoutinesJoiningNodeDiscoveryData nodeData =
(ContinuousRoutinesJoiningNodeDiscoveryData)
- data.joiningNodeData();
+ ContinuousRoutinesJoiningNodeDiscoveryData nodeData =
data.joiningNodeData();
for (ContinuousRoutineInfo routineInfo :
nodeData.startedRoutines) {
routinesInfo.addRoutineInfo(routineInfo);
@@ -533,7 +532,7 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
}
else {
if (data.hasJoiningNodeData())
-
onDiscoveryDataReceivedMutable((DiscoveryData)data.joiningNodeData());
+ onDiscoveryDataReceivedMutable(data.joiningNodeData());
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
index f6895734545..8946672364e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
@@ -326,7 +326,7 @@ public class GridMarshallerMappingProcessor extends
GridProcessorAdapter {
/** {@inheritDoc} */
@Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
- dataBag.addJoiningNodeData(MARSHALLER_PROC.ordinal(),
marshallerCtx.getCachedMappings());
+ dataBag.addJoiningNodeData(MARSHALLER_PROC.ordinal(), new
MarshallerMappingsData(marshallerCtx.getCachedMappings()));
}
/** {@inheritDoc} */
@@ -337,9 +337,9 @@ public class GridMarshallerMappingProcessor extends
GridProcessorAdapter {
/** {@inheritDoc} */
@Override public void
onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) {
- List<Map<Integer, MappedName>> mappings = (List<Map<Integer,
MappedName>>)data.joiningNodeData();
+ MarshallerMappingsData mappingsData = data.joiningNodeData();
- processIncomingMappings(mappings);
+ processIncomingMappings(mappingsData.mappings);
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappedName.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappedName.java
index eae07d2d01b..4a000fadc23 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappedName.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappedName.java
@@ -19,20 +19,27 @@ package org.apache.ignite.internal.processors.marshaller;
import java.io.Serializable;
import java.util.Objects;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
/**
* Contains mapped class name and boolean flag showing whether this mapping
was accepted by other nodes or not.
*/
-public final class MappedName implements Serializable {
+public final class MappedName implements Serializable, Message {
/** */
private static final long serialVersionUID = 0L;
/** */
- private final String clsName;
+ @Order(0)
+ String clsName;
/** */
- private final boolean accepted;
+ @Order(1)
+ boolean accepted;
+
+ /** */
+ public MappedName() {}
/**
* @param clsName Class name.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingsData.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingsData.java
new file mode 100644
index 00000000000..2207b1c21f4
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingsData.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.marshaller;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+/** */
+public class MarshallerMappingsData implements Message {
+ /** */
+ @Order(0)
+ List<Map<Integer, MappedName>> mappings;
+
+ /** */
+ public MarshallerMappingsData() {}
+
+ /**
+ * @param mappings Mappings.
+ */
+ public MarshallerMappingsData(List<Map<Integer, MappedName>> mappings) {
+ this.mappings = mappings;
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
index 2e24715108b..6e67b52250e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
@@ -571,14 +571,9 @@ public class DistributedMetaStorageImpl extends
GridProcessorAdapter
EMPTY_ARRAY
);
- try {
- dataBag.addJoiningNodeData(COMPONENT_ID,
marshaller.marshal(data));
+ dataBag.addJoiningNodeData(COMPONENT_ID, data);
- return;
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
+ return;
}
Serializable data = new DistributedMetaStorageJoiningNodeData(
@@ -587,12 +582,7 @@ public class DistributedMetaStorageImpl extends
GridProcessorAdapter
histCache.toArray()
);
- try {
- dataBag.addJoiningNodeData(COMPONENT_ID,
marshaller.marshal(data));
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
+ dataBag.addJoiningNodeData(COMPONENT_ID, data);
}
finally {
lock.readLock().unlock();
@@ -640,10 +630,10 @@ public class DistributedMetaStorageImpl extends
GridProcessorAdapter
try {
DistributedMetaStorageVersion locVer = ver;
- DistributedMetaStorageJoiningNodeData joiningData =
getJoiningNodeData(discoData);
+ DistributedMetaStorageJoiningNodeData joiningData =
discoData.joiningNodeData();
if (joiningData == null) {
- String errorMsg = "Cannot unmarshal joining node data";
+ String errorMsg = "Empty joining node data";
return new IgniteNodeValidationResult(node.id(), errorMsg);
}
@@ -774,10 +764,7 @@ public class DistributedMetaStorageImpl extends
GridProcessorAdapter
if (!discoData.hasJoiningNodeData())
return;
- DistributedMetaStorageJoiningNodeData joiningData =
getJoiningNodeData(discoData);
-
- if (joiningData == null)
- return;
+ DistributedMetaStorageJoiningNodeData joiningData =
discoData.joiningNodeData();
DistributedMetaStorageVersion remoteVer = joiningData.ver;
@@ -832,10 +819,7 @@ public class DistributedMetaStorageImpl extends
GridProcessorAdapter
if (!discoData.hasJoiningNodeData())
return;
- DistributedMetaStorageJoiningNodeData joiningData =
getJoiningNodeData(discoData);
-
- if (joiningData == null)
- return;
+ DistributedMetaStorageJoiningNodeData joiningData =
discoData.joiningNodeData();
DistributedMetaStorageVersion remoteVer = joiningData.ver;
@@ -880,29 +864,6 @@ public class DistributedMetaStorageImpl extends
GridProcessorAdapter
}
}
- /**
- * Retrieve joining node data from discovery data. It is expected that it
is present as a {@code byte[]} object.
- *
- * @param discoData Joining node discovery data.
- * @return Unmarshalled data or null if unmarshalling failed.
- */
- @Nullable private DistributedMetaStorageJoiningNodeData getJoiningNodeData(
- JoiningNodeDiscoveryData discoData
- ) {
- byte[] data = (byte[])discoData.joiningNodeData();
-
- assert data != null;
-
- try {
- return marshaller.unmarshal(data, U.gridClassLoader());
- }
- catch (IgniteCheckedException e) {
- log.error("Unable to unmarshal joinging node data for distributed
metastorage component.", e);
-
- return null;
- }
- }
-
/** {@inheritDoc} */
@Override public void onDisconnected(IgniteFuture<?> reconnectFut) {
assert isClient;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java
index 6a8fbf2962e..59bcabd2bfa 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java
@@ -194,7 +194,7 @@ public class IgnitePluginProcessor extends
GridProcessorAdapter {
/** {@inheritDoc} */
@Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData
data) {
if (data.hasJoiningNodeData()) {
- Map<String, Serializable> pluginsData = (Map<String,
Serializable>)data.joiningNodeData();
+ Map<String, Serializable> pluginsData = data.joiningNodeData();
applyPluginsData(data.joiningNodeId(), pluginsData);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 6df09f83c41..854e8a17e58 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -496,28 +496,19 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
/** {@inheritDoc} */
@Override public void
onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) {
- if (data.hasJoiningNodeData() && data.joiningNodeData() instanceof
Map) {
- Map<String, Serializable> nodeSpecificDataMap = (Map<String,
Serializable>)data.joiningNodeData();
+ Object joiningNodeData = data.joiningNodeData();
- if (nodeSpecificDataMap.containsKey(INLINE_SIZES_DISCO_BAG_KEY)) {
- Serializable serializable =
nodeSpecificDataMap.get(INLINE_SIZES_DISCO_BAG_KEY);
+ if (joiningNodeData instanceof InlineSizesData) {
+ Map<String, Integer> joiningNodeIndexesInlineSize =
((InlineSizesData)joiningNodeData).sizes;
- assert serializable instanceof Map : serializable;
-
- Map<String, Integer> joiningNodeIndexesInlineSize =
(Map<String, Integer>)serializable;
-
- checkInlineSizes(secondaryIndexesInlineSize(),
joiningNodeIndexesInlineSize, data.joiningNodeId());
- }
+ checkInlineSizes(secondaryIndexesInlineSize(),
joiningNodeIndexesInlineSize, data.joiningNodeId());
}
}
/** {@inheritDoc} */
@Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
- HashMap<String, Serializable> dataMap = new HashMap<>();
-
- dataMap.put(INLINE_SIZES_DISCO_BAG_KEY,
collectSecondaryIndexesInlineSize());
-
-
dataBag.addJoiningNodeData(DiscoveryDataExchangeType.QUERY_PROC.ordinal(),
dataMap);
+
dataBag.addJoiningNodeData(DiscoveryDataExchangeType.QUERY_PROC.ordinal(),
+ new InlineSizesData(secondaryIndexesInlineSize()));
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java
new file mode 100644
index 00000000000..eb3813501f6
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.Map;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+/** */
+public class InlineSizesData implements Message {
+ /** */
+ @Order(0)
+ Map<String, Integer> sizes;
+
+ /** */
+ public InlineSizesData() {}
+
+ /**
+ * @param sizes Inline sizes.
+ */
+ public InlineSizesData(Map<String, Integer> sizes) {
+ this.sizes = sizes;
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
index 86fa2f2cea9..e7f59549ea4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
@@ -418,7 +418,9 @@ public class IgniteServiceProcessor extends
GridProcessorAdapter implements Igni
if (data.joiningNodeData() == null)
return null;
- List<ServiceInfo> svcs =
((ServiceProcessorJoinNodeDiscoveryData)data.joiningNodeData()).services();
+ ServiceProcessorJoinNodeDiscoveryData srvcProcData =
data.joiningNodeData();
+
+ List<ServiceInfo> svcs = srvcProcData.services();
if (ctx.security().enabled()) {
SecurityException err = checkDeployPermissionDuringJoin(node,
svcs);
@@ -429,7 +431,10 @@ public class IgniteServiceProcessor extends
GridProcessorAdapter implements Igni
for (ServiceInfo svc : svcs) {
try {
- unmarshalNodeFilterIfNeeded(svc.configuration());
+ // Returned value is ignored, because we only need to check
possibility of marshalling.
+ // We don't need to save node filter in lazy configuration at
this moment.
+ // Filter will be unmarhshalled and saved during adding node
to topology (see #onGridDataReceived).
+ unmarshalNodeFilter(svc.configuration());
}
catch (IgniteCheckedException e) {
return new IgniteNodeValidationResult(node.id(), "Node join is
rejected [joiningNodeId=" + node.id() +
@@ -445,7 +450,7 @@ public class IgniteServiceProcessor extends
GridProcessorAdapter implements Igni
if (data.joiningNodeData() == null)
return;
- ServiceProcessorJoinNodeDiscoveryData joinData =
(ServiceProcessorJoinNodeDiscoveryData)data.joiningNodeData();
+ ServiceProcessorJoinNodeDiscoveryData joinData =
data.joiningNodeData();
for (ServiceInfo desc : joinData.services()) {
assert desc.topologySnapshot().isEmpty();
@@ -1442,12 +1447,17 @@ public class IgniteServiceProcessor extends
GridProcessorAdapter implements Igni
if (cfg.getNodeFilter() != null)
return;
+ cfg.setNodeFilter(unmarshalNodeFilter(cfg));
+ }
+
+ /** @param cfg Lazy service configuration. */
+ private IgnitePredicate<ClusterNode>
unmarshalNodeFilter(LazyServiceConfiguration cfg) throws IgniteCheckedException
{
GridDeployment dep =
ctx.deploy().getDeployment(cfg.serviceClassName());
ClassLoader clsLdr = U.resolveClassLoader(dep != null ?
dep.classLoader() : null, ctx.config());
try {
- cfg.setNodeFilter(U.unmarshal(marsh, cfg.nodeFilterBytes(),
clsLdr));
+ return U.unmarshal(marsh, cfg.nodeFilterBytes(), clsLdr);
}
catch (IgniteCheckedException e) {
throw new IgniteCheckedException("Failed to unmarshal class of
service node filter [cfg=" + cfg + ']', e);
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
index 28fc5f1a7b8..58e41738265 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
@@ -24,6 +24,7 @@ import java.util.Set;
import java.util.UUID;
import org.apache.ignite.internal.GridComponent;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;
/**
@@ -44,8 +45,11 @@ public class DiscoveryDataBag {
/** @return Whether joining node provided discovery data. */
boolean hasJoiningNodeData();
- /** @return Joining node data. */
- Serializable joiningNodeData();
+ /**
+ * @param <T> Data type.
+ * @return Joining node data.
+ */
+ <T> T joiningNodeData();
}
/**
@@ -80,8 +84,10 @@ public class DiscoveryDataBag {
}
/** {@inheritDoc} */
- @Override @Nullable public Serializable joiningNodeData() {
- return joiningNodeData.get(cmpId);
+ @Override @Nullable public <T> T joiningNodeData() {
+ Message dataMsg = joiningNodeData.get(cmpId);
+
+ return dataMsg instanceof ObjectData ? ObjectData.unwrap(dataMsg)
: (T)dataMsg;
}
/**
@@ -158,7 +164,7 @@ public class DiscoveryDataBag {
private Set<Integer> cmnDataInitializedCmps;
/** */
- private Map<Integer, Serializable> joiningNodeData = new HashMap<>();
+ private Map<Integer, Message> joiningNodeData = new HashMap<>();
/** */
private Map<Integer, Serializable> commonData = new HashMap<>();
@@ -237,9 +243,17 @@ public class DiscoveryDataBag {
/**
* @param cmpId Component ID.
- * @param data Data.
+ * @param data Serializable data.
*/
public void addJoiningNodeData(Integer cmpId, Serializable data) {
+ joiningNodeData.put(cmpId, new ObjectData(data));
+ }
+
+ /**
+ * @param cmpId Component ID.
+ * @param data Message data.
+ */
+ public void addJoiningNodeData(Integer cmpId, Message data) {
joiningNodeData.put(cmpId, data);
}
@@ -275,7 +289,7 @@ public class DiscoveryDataBag {
/**
* @param joinNodeData Joining node data.
*/
- public void joiningNodeData(Map<Integer, Serializable> joinNodeData) {
+ public void joiningNodeData(Map<Integer, Message> joinNodeData) {
joiningNodeData.putAll(joinNodeData);
}
@@ -294,7 +308,7 @@ public class DiscoveryDataBag {
}
/** @return Discovery data for each Ignite component that is sent to the
cluster nodes by joining node. */
- public Map<Integer, Serializable> joiningNodeData() {
+ public Map<Integer, Message> joiningNodeData() {
return joiningNodeData;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java
new file mode 100644
index 00000000000..f9da59bffe4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery;
+
+import java.io.Serializable;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.MarshallableMessage;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.jetbrains.annotations.Nullable;
+
+/** Wrapper message for serializable data. */
+public class ObjectData implements MarshallableMessage {
+ /** */
+ @GridToStringInclude
+ private Serializable data;
+
+ /** */
+ @GridToStringExclude
+ @Order(0)
+ byte[] dataBytes;
+
+ /** */
+ public ObjectData() {}
+
+ /**
+ * @param data Original data.
+ */
+ public ObjectData(Serializable data) {
+ this.data = data;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void prepareMarshal(Marshaller marsh) throws
IgniteCheckedException {
+ if (data != null)
+ dataBytes = U.marshal(marsh, data);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(Marshaller marsh, ClassLoader
clsLdr) throws IgniteCheckedException {
+ if (dataBytes != null) {
+ data = U.unmarshal(marsh, dataBytes, clsLdr);
+
+ dataBytes = null;
+ }
+ }
+
+ /**
+ * @param msg Message.
+ * @param <T> Type of data.
+ *
+ * @return Original data unwrapped from a message.
+ */
+ public static <T> T unwrap(@Nullable Message msg) {
+ return msg != null ? (T)(((ObjectData)msg).data) : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ObjectData.class, this);
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 0861b7f88be..f11ab667db0 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1982,7 +1982,6 @@ class ServerImpl extends TcpDiscoveryImpl {
nodeAddedMsg.topology(null);
nodeAddedMsg.topologyHistory(null);
nodeAddedMsg.messages(null);
- nodeAddedMsg.clearUnmarshalledDiscoveryData();
}
}
@@ -4524,19 +4523,9 @@ class ServerImpl extends TcpDiscoveryImpl {
err = spi.getSpiContext().validateNode(node);
if (err == null) {
- try {
- DiscoveryDataBag data =
msg.gridDiscoveryData().unmarshalJoiningNodeData(
- spi.marshaller(),
- U.resolveClassLoader(spi.ignite().configuration()),
- false,
- log
- );
-
- err = spi.getSpiContext().validateNode(node, data);
- }
- catch (IgniteCheckedException e) {
- err = new IgniteNodeValidationResult(node.id(),
e.getMessage());
- }
+ DiscoveryDataBag data =
msg.gridDiscoveryData().bagWithJoiningNodeData();
+
+ err = spi.getSpiContext().validateNode(node, data);
}
if (err != null) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 16d4578101d..6e14dfa8f27 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -2081,11 +2081,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter
implements IgniteDiscovery
//marshall collected bag into packet, return packet
if (dataPacket.joiningNodeId().equals(locNode.id()))
- dataPacket.marshalJoiningNodeData(
- dataBag,
- marshaller(),
- ignite.configuration().getNetworkCompressionLevel(),
- log);
+ dataPacket.addJoiningNodeData(dataBag);
else
dataPacket.marshalGridNodeData(
dataBag,
@@ -2122,7 +2118,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter
implements IgniteDiscovery
}
}
else
- dataBag =
dataPacket.unmarshalJoiningNodeDataSilently(marshaller(), clsLdr,
locNode.clientRouterNodeId() != null, log);
+ dataBag = dataPacket.bagWithJoiningNodeData();
exchange.onExchange(dataBag);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java
index 4922e7c0e86..4ded165df70 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java
@@ -24,10 +24,11 @@ import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.Compress;
import org.apache.ignite.internal.GridComponent;
import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
@@ -53,10 +54,8 @@ public class DiscoveryDataPacket implements Serializable,
Message {
/** */
@Order(1)
- Map<Integer, byte[]> joiningNodeData = new HashMap<>();
-
- /** */
- private transient Map<Integer, Serializable> unmarshalledJoiningNodeData;
+ @Compress
+ Map<Integer, Message> joiningNodeData = new HashMap<>();
/** */
@Order(2)
@@ -114,12 +113,10 @@ public class DiscoveryDataPacket implements Serializable,
Message {
/**
* @param bag Bag.
- * @param marsh Marsh.
- * @param log Logger.
*/
- public void marshalJoiningNodeData(DiscoveryDataBag bag, Marshaller marsh,
- int compressionLevel, IgniteLogger log) {
- marshalData(bag.joiningNodeData(), joiningNodeData, marsh,
compressionLevel, log);
+ public void addJoiningNodeData(DiscoveryDataBag bag) {
+ if (!F.isEmpty(bag.joiningNodeData()))
+ joiningNodeData.putAll(bag.joiningNodeData());
}
/**
@@ -161,67 +158,13 @@ public class DiscoveryDataPacket implements Serializable,
Message {
}
/**
- * @param marsh Marsh.
- * @param clsLdr Class loader.
- * @param clientNode Client node.
- * @param log Logger.
- * @throws IgniteCheckedException If unmarshalling failed.
+ * @return Data bag with joining node data.
*/
- public DiscoveryDataBag unmarshalJoiningNodeData(
- Marshaller marsh,
- ClassLoader clsLdr,
- boolean clientNode,
- IgniteLogger log
- ) throws IgniteCheckedException {
- return unmarshalJoiningNodeData(marsh, clsLdr, clientNode, log, true);
- }
-
- /**
- * @param marsh Marsh.
- * @param clsLdr Class loader.
- * @param clientNode Client node.
- * @param log Logger.
- */
- public DiscoveryDataBag unmarshalJoiningNodeDataSilently(
- Marshaller marsh,
- ClassLoader clsLdr,
- boolean clientNode,
- IgniteLogger log
- ) {
- try {
- return unmarshalJoiningNodeData(marsh, clsLdr, clientNode, log,
false);
- }
- catch (IgniteCheckedException impossible) {
- assert false : impossible;
-
- log.error("Failed to unmarshal joining node data", impossible);
-
- throw new IgniteException(impossible);
- }
- }
-
- /**
- * @param marsh Marsh.
- * @param clsLdr Class loader.
- * @param clientNode Client node.
- * @param log Logger.
- * @param panic Throw unmarshalling if {@code true}.
- * @throws IgniteCheckedException If {@code panic} is {@code true} and
unmarshalling failed.
- */
- private DiscoveryDataBag unmarshalJoiningNodeData(
- Marshaller marsh,
- ClassLoader clsLdr,
- boolean clientNode,
- IgniteLogger log,
- boolean panic
- ) throws IgniteCheckedException {
+ public DiscoveryDataBag bagWithJoiningNodeData() {
DiscoveryDataBag dataBag = new DiscoveryDataBag(joiningNodeId,
joiningNodeClient);
- if (joiningNodeData != null && !joiningNodeData.isEmpty()) {
- unmarshalledJoiningNodeData = unmarshalData(joiningNodeData,
marsh, clsLdr, clientNode, log, panic);
-
- dataBag.joiningNodeData(unmarshalledJoiningNodeData);
- }
+ if (!F.isEmpty(joiningNodeData))
+ dataBag.joiningNodeData(joiningNodeData);
return dataBag;
}
@@ -230,7 +173,7 @@ public class DiscoveryDataPacket implements Serializable,
Message {
*
*/
public boolean hasJoiningNodeData() {
- return joiningNodeData != null && !joiningNodeData.isEmpty();
+ return !F.isEmpty(joiningNodeData);
}
/**
@@ -443,8 +386,8 @@ public class DiscoveryDataPacket implements Serializable,
Message {
public DiscoveryDataBag bagForDataCollection() {
DiscoveryDataBag dataBag = new DiscoveryDataBag(joiningNodeId,
commonData.keySet(), joiningNodeClient);
- if (unmarshalledJoiningNodeData != null)
- dataBag.joiningNodeData(unmarshalledJoiningNodeData);
+ if (joiningNodeData != null)
+ dataBag.joiningNodeData(joiningNodeData);
return dataBag;
}
@@ -455,12 +398,4 @@ public class DiscoveryDataPacket implements Serializable,
Message {
public void joiningNodeClient(boolean joiningNodeClient) {
this.joiningNodeClient = joiningNodeClient;
}
-
- /**
- * Clears {@link #unmarshalledJoiningNodeData}
- */
- public void clearUnmarshalledJoiningNodeData() {
- unmarshalledJoiningNodeData = null;
- }
-
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
index ed717fcbbc0..b810ac6b322 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
@@ -195,15 +195,6 @@ public class TcpDiscoveryNodeAddedMessage extends
TcpDiscoveryAbstractTraceableM
dataPacket = null;
}
- /**
- * Clears unmarshalled discovery data to minimize message size.
- * These data are used only on "collect" stage and are not part of
persistent state.
- */
- public void clearUnmarshalledDiscoveryData() {
- if (dataPacket != null)
- dataPacket.clearUnmarshalledJoiningNodeData();
- }
-
/** @return First grid node start time. */
public long gridStartTime() {
return gridStartTime;
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties
b/modules/core/src/main/resources/META-INF/classnames.properties
index 954755b375a..f6acd558984 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -737,7 +737,7 @@
org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyRequest
org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyResponse
org.apache.ignite.internal.managers.encryption.GridEncryptionManager$EmptyResult
org.apache.ignite.internal.managers.encryption.GridEncryptionManager$MasterKeyChangeRequest
-org.apache.ignite.internal.managers.encryption.GridEncryptionManager$NodeEncryptionKeys
+org.apache.ignite.internal.managers.encryption.NodeEncryptionKeys
org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted
org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessage
org.apache.ignite.internal.managers.indexing.GridIndexingManager$1
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java
index 965e75e26cb..120c3479326 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.metastorage;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -26,7 +25,7 @@ import org.apache.ignite.internal.IgniteEx;
import
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import
org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import
org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
-import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
@@ -537,7 +536,7 @@ public class DistributedMetaStoragePersistentTest extends
DistributedMetaStorage
DiscoverySpiDataExchange exchange = GridTestUtils.getFieldValue(spi,
TcpDiscoverySpi.class, "exchange");
- List<Map<Integer, Serializable>> dataBags = new ArrayList<>();
+ List<Map<Integer, Message>> dataBags = new ArrayList<>();
spi.setDataExchange(new DiscoverySpiDataExchange() {
@Override public DiscoveryDataBag collect(DiscoveryDataBag
dataBag) {
@@ -555,11 +554,9 @@ public class DistributedMetaStoragePersistentTest extends
DistributedMetaStorage
assertEquals(1, dataBags.size());
- byte[] joiningNodeDataMarshalled =
(byte[])dataBags.get(0).get(META_STORAGE.ordinal());
+ Object joiningNodeData = dataBags.get(0).get(META_STORAGE.ordinal());
- assertNotNull(joiningNodeDataMarshalled);
-
- Object joiningNodeData =
TEST_JDK_MARSHALLER.unmarshal(joiningNodeDataMarshalled, U.gridClassLoader());
+ assertNotNull(joiningNodeData);
Object[] hist = GridTestUtils.getFieldValue(joiningNodeData, "hist");
diff --git
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java
index 6512c2ac9a4..d29a73f7039 100644
---
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java
+++
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java
@@ -31,7 +31,6 @@ import
org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
import org.apache.ignite.spi.IgniteSpiException;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.makeMessageType;
@@ -51,11 +50,11 @@ public class DiscoveryMessageParser {
}
/** Marshals discovery message to bytes array. */
- public byte[] marshalZip(DiscoverySpiCustomMessage msg) {
+ public byte[] marshalZip(Message msg) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (DeflaterOutputStream out = new DeflaterOutputStream(baos)) {
- serializeMessage((Message)msg, out);
+ serializeMessage(msg, out);
}
catch (Exception e) {
throw new IgniteSpiException("Failed to serialize message: " +
msg, e);
@@ -65,12 +64,12 @@ public class DiscoveryMessageParser {
}
/** Unmarshals discovery message from bytes array. */
- public DiscoverySpiCustomMessage unmarshalZip(byte[] bytes) {
+ public <T extends Message> T unmarshalZip(byte[] bytes) {
try (
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
InflaterInputStream in = new InflaterInputStream(bais)
) {
- return (DiscoverySpiCustomMessage)deserializeMessage(in);
+ return deserializeMessage(in);
}
catch (Exception e) {
throw new IgniteSpiException("Failed to deserialize message.", e);
@@ -99,7 +98,7 @@ public class DiscoveryMessageParser {
}
/** */
- private Message deserializeMessage(InputStream in) throws IOException {
+ private <T extends Message> T deserializeMessage(InputStream in) throws
IOException {
DirectMessageReader msgReader = new DirectMessageReader(msgFactory,
null);
ByteBuffer msgBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE);
@@ -127,6 +126,6 @@ public class DiscoveryMessageParser {
}
while (!finished);
- return msg;
+ return (T)msg;
}
}
diff --git
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
index ff8311d071b..f2b5ac46368 100644
---
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
+++
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
@@ -38,7 +38,7 @@ class ZkJoiningNodeData implements Serializable {
/** */
@GridToStringInclude
- private Map<Integer, Serializable> discoData;
+ private Map<Integer, byte[]> discoData;
/**
* @param partCnt Number of parts in multi-parts message.
@@ -51,7 +51,7 @@ class ZkJoiningNodeData implements Serializable {
* @param node Node.
* @param discoData Discovery data.
*/
- ZkJoiningNodeData(ZookeeperClusterNode node, Map<Integer, Serializable>
discoData) {
+ ZkJoiningNodeData(ZookeeperClusterNode node, Map<Integer, byte[]>
discoData) {
assert node != null && node.id() != null : node;
assert discoData != null;
@@ -76,7 +76,7 @@ class ZkJoiningNodeData implements Serializable {
/**
* @return Discovery data.
*/
- Map<Integer, Serializable> discoveryData() {
+ Map<Integer, byte[]> discoveryData() {
return discoData;
}
diff --git
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 7abe3ddf1ed..0d0531d1a9a 100644
---
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -69,6 +69,7 @@ import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.LT;
@@ -804,7 +805,8 @@ public class ZookeeperDiscoveryImpl {
exchange.collect(discoDataBag);
- ZkJoiningNodeData joinData = new ZkJoiningNodeData(locNode,
discoDataBag.joiningNodeData());
+ ZkJoiningNodeData joinData = new ZkJoiningNodeData(locNode,
+ new HashMap<>(F.viewReadOnly(discoDataBag.joiningNodeData(),
msgParser::marshalZip)));
byte[] joinDataBytes;
@@ -2070,7 +2072,7 @@ public class ZookeeperDiscoveryImpl {
if (err == null) {
DiscoveryDataBag joiningNodeBag = new DiscoveryDataBag(node.id(),
joiningNodeData.node().isClient());
- joiningNodeBag.joiningNodeData(joiningNodeData.discoveryData());
+
joiningNodeBag.joiningNodeData(F.viewReadOnly(joiningNodeData.discoveryData(),
msgParser::unmarshalZip));
err = spi.getSpiContext().validateNode(node, joiningNodeBag);
}
@@ -2237,7 +2239,7 @@ public class ZookeeperDiscoveryImpl {
DiscoveryDataBag joiningNodeBag = new DiscoveryDataBag(nodeId,
joiningNodeData.node().isClient());
- joiningNodeBag.joiningNodeData(joiningNodeData.discoveryData());
+
joiningNodeBag.joiningNodeData(F.viewReadOnly(joiningNodeData.discoveryData(),
msgParser::unmarshalZip));
exchange.onExchange(joiningNodeBag);
@@ -2873,7 +2875,7 @@ public class ZookeeperDiscoveryImpl {
DiscoveryDataBag dataBag = new
DiscoveryDataBag(joinedEvtData.nodeId, joiningData.node().isClient());
- dataBag.joiningNodeData(joiningData.discoveryData());
+
dataBag.joiningNodeData(F.viewReadOnly(joiningData.discoveryData(),
msgParser::unmarshalZip));
exchange.onExchange(dataBag);
}