ignite-324 Partition exchange: node should be assigned as primary only after preloading is finished Implemented 'late affinity assignment', also fixes: - fixed BinaryObject/BinaryReaderExImpl to properly handle case when class name is written instead of type id - fixed datastructures code to do not retry updates inside another transaction (otherwise wait for retry hangs) - fixed races in dynamic cache start (races between cache data received on exchange, exchange, GridDhtAffinityAssignmentRequest processing) - changed GridCacheAdapter.asyncOp to pass ready affinity version in tx op (otherwise async ops can block system threads waiting for affinity)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e7e223f7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e7e223f7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e7e223f7 Branch: refs/heads/ignite-2004 Commit: e7e223f7c6b73dc0fa28a545671a528b78a00271 Parents: 00a3937 Author: sboikov <[email protected]> Authored: Tue Apr 5 14:37:23 2016 +0300 Committer: sboikov <[email protected]> Committed: Tue Apr 5 14:37:25 2016 +0300 ---------------------------------------------------------------------- .../ignite/cache/affinity/AffinityFunction.java | 2 + .../configuration/IgniteConfiguration.java | 50 + .../apache/ignite/internal/IgniteKernal.java | 2 + .../ignite/internal/IgniteNodeAttributes.java | 3 + .../internal/binary/BinaryObjectImpl.java | 52 +- .../binary/BinaryObjectOffheapImpl.java | 40 +- .../internal/binary/BinaryReaderExImpl.java | 62 +- .../ignite/internal/binary/BinaryTypeImpl.java | 8 + .../ignite/internal/binary/BinaryUtils.java | 4 +- .../internal/binary/GridBinaryMarshaller.java | 4 +- .../binary/builder/BinaryBuilderReader.java | 11 +- .../communication/GridIoMessageFactory.java | 4 +- .../discovery/GridDiscoveryManager.java | 70 +- .../affinity/GridAffinityAssignment.java | 49 +- .../affinity/GridAffinityAssignmentCache.java | 246 +- .../affinity/GridAffinityProcessor.java | 2 +- .../processors/affinity/GridAffinityUtils.java | 3 +- .../cache/CacheAffinityChangeMessage.java | 160 ++ .../cache/CacheAffinitySharedManager.java | 1805 ++++++++++++ .../cache/DynamicCacheChangeRequest.java | 17 + .../cache/DynamicCacheDescriptor.java | 49 + .../processors/cache/GridCacheAdapter.java | 276 +- .../cache/GridCacheAffinityManager.java | 125 +- .../cache/GridCacheClearAllRunnable.java | 1 - .../processors/cache/GridCacheContext.java | 2 +- .../cache/GridCacheEvictionManager.java | 18 +- .../processors/cache/GridCacheIoManager.java | 29 +- .../processors/cache/GridCacheMapEntry.java | 1 - .../cache/GridCacheMvccCandidate.java | 16 +- .../GridCachePartitionExchangeManager.java | 154 +- .../processors/cache/GridCachePreloader.java | 18 +- .../cache/GridCachePreloaderAdapter.java | 12 +- .../processors/cache/GridCacheProcessor.java | 216 +- .../processors/cache/GridCacheProxyImpl.java | 13 + .../cache/GridCacheSharedContext.java | 18 +- .../processors/cache/GridCacheUtils.java | 122 - .../processors/cache/IgniteInternalCache.java | 5 + .../cache/affinity/GridCacheAffinityImpl.java | 2 +- .../CacheDataStructuresManager.java | 4 +- .../distributed/GridCacheCommittedTxInfo.java | 1 + .../GridDistributedCacheAdapter.java | 2 +- .../GridDistributedLockResponse.java | 8 - .../GridDistributedTxRemoteAdapter.java | 9 +- .../dht/CacheDistributedGetFutureAdapter.java | 3 - .../dht/GridClientPartitionTopology.java | 10 +- .../dht/GridDhtAffinityAssignmentResponse.java | 198 +- .../dht/GridDhtAssignmentFetchFuture.java | 80 +- .../distributed/dht/GridDhtCacheAdapter.java | 56 +- .../cache/distributed/dht/GridDhtGetFuture.java | 2 + .../distributed/dht/GridDhtGetSingleFuture.java | 2 + .../distributed/dht/GridDhtLockFuture.java | 7 - .../dht/GridDhtPartitionTopology.java | 10 +- .../dht/GridDhtPartitionTopologyImpl.java | 328 ++- .../dht/GridDhtTransactionalCacheAdapter.java | 1 + .../distributed/dht/GridDhtTxFinishFuture.java | 18 +- .../cache/distributed/dht/GridDhtTxLocal.java | 6 +- .../distributed/dht/GridDhtTxLocalAdapter.java | 4 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 10 +- .../cache/distributed/dht/GridDhtTxRemote.java | 4 + .../dht/GridPartitionedGetFuture.java | 8 +- .../dht/GridPartitionedSingleGetFuture.java | 8 +- .../dht/atomic/GridDhtAtomicCache.java | 1 + .../dht/atomic/GridNearAtomicUpdateFuture.java | 2 +- .../dht/colocated/GridDhtColocatedCache.java | 6 +- .../dht/preloader/GridDhtPartitionDemander.java | 19 +- .../dht/preloader/GridDhtPartitionMap2.java | 4 +- .../dht/preloader/GridDhtPartitionSupplier.java | 29 +- .../GridDhtPartitionsExchangeFuture.java | 1547 +++++----- .../dht/preloader/GridDhtPreloader.java | 103 +- .../distributed/near/GridNearCacheAdapter.java | 2 +- .../distributed/near/GridNearCacheEntry.java | 118 +- .../distributed/near/GridNearGetFuture.java | 26 +- ...arOptimisticSerializableTxPrepareFuture.java | 5 +- .../near/GridNearOptimisticTxPrepareFuture.java | 34 +- ...ridNearOptimisticTxPrepareFutureAdapter.java | 16 +- .../near/GridNearTransactionalCache.java | 7 +- .../near/GridNearTxFinishFuture.java | 42 +- .../cache/distributed/near/GridNearTxLocal.java | 26 +- .../near/GridNearTxPrepareFutureAdapter.java | 2 +- .../distributed/near/GridNearTxRemote.java | 19 + .../processors/cache/dr/GridCacheDrManager.java | 4 +- .../cache/dr/GridOsCacheDrManager.java | 2 +- .../cache/local/GridLocalCacheEntry.java | 27 - .../cache/local/GridLocalLockFuture.java | 15 - .../cache/query/GridCacheQueryManager.java | 15 - .../cache/transactions/IgniteTxAdapter.java | 14 +- .../cache/transactions/IgniteTxEntry.java | 8 +- .../cache/transactions/IgniteTxHandler.java | 16 +- .../transactions/IgniteTxLocalAdapter.java | 97 +- .../cache/transactions/IgniteTxLocalEx.java | 13 +- .../cache/transactions/IgniteTxManager.java | 93 +- .../cache/version/GridCacheVersion.java | 14 +- .../cache/version/GridCacheVersionEx.java | 9 + .../cache/version/GridCacheVersionManager.java | 21 +- .../continuous/GridContinuousProcessor.java | 1 - .../datastructures/DataStructuresProcessor.java | 4 +- .../processors/odbc/OdbcMessageParser.java | 2 +- .../platform/PlatformContextImpl.java | 1 + .../service/GridServiceProcessor.java | 64 +- .../ignite/internal/util/IgniteUtils.java | 16 + .../internal/util/future/GridFutureAdapter.java | 3 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 127 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 1 + .../GridCacheAffinityBackupsSelfTest.java | 2 +- .../affinity/AffinityClientNodeSelfTest.java | 4 +- .../fair/FairAffinityFunctionNodesSelfTest.java | 2 + .../ignite/internal/GridAffinitySelfTest.java | 5 +- .../GridTaskFailoverAffinityRunTest.java | 6 +- .../IgniteClientReconnectAbstractTest.java | 8 +- .../IgniteClientReconnectAtomicsTest.java | 8 +- .../IgniteClientReconnectCollectionsTest.java | 4 +- .../IgniteClientReconnectComputeTest.java | 6 +- .../IgniteClientReconnectFailoverTest.java | 2 + .../IgniteClientReconnectServicesTest.java | 4 +- .../IgniteClientReconnectStreamerTest.java | 2 +- .../internal/TestRecordingCommunicationSpi.java | 18 +- .../cache/CacheAffinityCallSelfTest.java | 45 +- .../processors/cache/CacheNamesSelfTest.java | 16 +- ...cheNearUpdateTopologyChangeAbstractTest.java | 2 + .../cache/CacheReadThroughRestartSelfTest.java | 2 + .../GridCacheAbstractRemoveFailureTest.java | 10 +- .../cache/GridCacheDeploymentSelfTest.java | 2 + .../cache/GridCacheEntryVersionSelfTest.java | 2 +- ...ridCacheStoreManagerDeserializationTest.java | 40 +- ...acheTcpClientDiscoveryMultiThreadedTest.java | 2 +- .../GridCacheVersionTopologyChangeTest.java | 246 ++ .../IgniteCacheEntryProcessorNodeJoinTest.java | 147 +- .../cache/IgniteCacheIncrementTxTest.java | 299 ++ .../IgniteCacheP2pUnmarshallingErrorTest.java | 1 - ...CacheP2pUnmarshallingRebalanceErrorTest.java | 36 +- .../IgniteCacheP2pUnmarshallingTxErrorTest.java | 2 + .../IgniteClientAffinityAssignmentSelfTest.java | 2 +- ...niteDynamicCacheStartStopConcurrentTest.java | 6 +- .../cache/IgniteTxReentryAbstractSelfTest.java | 2 +- ...eAbstractDataStructuresFailoverSelfTest.java | 12 +- .../CacheGetInsideLockChangingTopologyTest.java | 6 + ...eLateAffinityAssignmentFairAffinityTest.java | 32 + .../CacheLateAffinityAssignmentTest.java | 2688 ++++++++++++++++++ .../GridCacheAbstractJobExecutionTest.java | 6 +- .../GridCacheTransformEventSelfTest.java | 2 +- ...niteCacheClientNodeChangingTopologyTest.java | 28 +- ...teCacheClientNodePartitionsExchangeTest.java | 85 +- .../IgniteCacheClientReconnectTest.java | 2 + .../IgniteCacheNearRestartRollbackSelfTest.java | 28 +- ...idCachePartitionedPreloadEventsSelfTest.java | 11 + ...ridCachePartitionedUnloadEventsSelfTest.java | 2 + .../IgniteCachePutRetryAbstractSelfTest.java | 4 +- ...imaryWriteOrderMultiNodeFullApiSelfTest.java | 35 + .../near/GridCacheNearJobExecutionSelfTest.java | 2 - .../near/GridCacheNearMultiNodeSelfTest.java | 4 +- .../near/GridCacheNearReadersSelfTest.java | 2 + .../near/GridCacheNearTxForceKeyTest.java | 6 +- ...LateAffDisabledMultiNodeFullApiSelfTest.java | 34 + ...achePartitionedMultiNodeCounterSelfTest.java | 43 +- ...achePartitionedMultiNodeFullApiSelfTest.java | 2 + ...idCacheRendezvousAffinityClientSelfTest.java | 2 + .../GridCacheRebalancingSyncSelfTest.java | 83 +- .../GridCacheReplicatedJobExecutionTest.java | 2 - ...ContinuousQueryFailoverAbstractSelfTest.java | 36 +- ...BehindStorePartitionedMultiNodeSelfTest.java | 11 +- .../loadtests/hashmap/GridCacheTestContext.java | 2 + .../testframework/junits/GridAbstractTest.java | 3 +- .../junits/common/GridCommonAbstractTest.java | 116 +- .../IgniteCacheFullApiSelfTestSuite.java | 6 + .../testsuites/IgniteCacheTestSuite2.java | 4 + .../testsuites/IgniteCacheTestSuite3.java | 2 + .../testsuites/IgniteCacheTestSuite5.java | 5 + .../processors/hadoop/HadoopContext.java | 3 +- .../child/HadoopChildProcessRunner.java | 2 +- .../HadoopIgfs20FileSystemAbstractSelfTest.java | 2 + .../IgniteHadoopFileSystemAbstractSelfTest.java | 2 + .../h2/twostep/GridReduceQueryExecutor.java | 4 +- .../cache/IgniteClientReconnectQueriesTest.java | 4 +- .../IgniteCacheQueryNodeRestartSelfTest2.java | 3 + .../Continuous/ContinuousQueryAbstractTest.cs | 25 +- .../Apache.Ignite.Core.Tests/ExceptionsTest.cs | 5 + 176 files changed, 8907 insertions(+), 2407 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityFunction.java index cd12ab3..3af2a4b 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityFunction.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityFunction.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.util.List; import java.util.UUID; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.IgniteConfiguration; /** * Cache key affinity which maps keys to nodes. This interface is utilized for @@ -47,6 +48,7 @@ import org.apache.ignite.cluster.ClusterNode; * {@link AffinityKeyMapped @AffinityKeyMapped} documentation. * @see AffinityKeyMapped * @see AffinityKeyMapper + * @see IgniteConfiguration#isLateAffinityAssignment() */ public interface AffinityFunction extends Serializable { /** http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index e06978f..1aa3920 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -32,6 +32,8 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.Ignition; import org.apache.ignite.cache.CacheKeyConfiguration; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cache.store.CacheStoreSessionListener; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterNode; @@ -202,6 +204,9 @@ public class IgniteConfiguration { /** Default value for cache sanity check enabled flag. */ public static final boolean DFLT_CACHE_SANITY_CHECK_ENABLED = true; + /** Default value for late affinity assignment flag. */ + public static final boolean DFLT_LATE_AFF_ASSIGNMENT = true; + /** Default failure detection timeout in millis. */ @SuppressWarnings("UnnecessaryBoxing") public static final Long DFLT_FAILURE_DETECTION_TIMEOUT = new Long(10_000); @@ -441,6 +446,9 @@ public class IgniteConfiguration { /** */ private BinaryConfiguration binaryCfg; + /** */ + private boolean lateAffAssignment = DFLT_LATE_AFF_ASSIGNMENT; + /** * Creates valid grid configuration with all default values. */ @@ -497,6 +505,7 @@ public class IgniteConfiguration { hadoopCfg = cfg.getHadoopConfiguration(); inclEvtTypes = cfg.getIncludeEventTypes(); includeProps = cfg.getIncludeProperties(); + lateAffAssignment = cfg.isLateAffinityAssignment(); lifecycleBeans = cfg.getLifecycleBeans(); locHost = cfg.getLocalHost(); log = cfg.getGridLogger(); @@ -2519,6 +2528,47 @@ public class IgniteConfiguration { this.platformCfg = platformCfg; } + /** + * Whether or not late affinity assignment mode should be used. + * <p> + * On each topology change, for each started cache partition-to-node mapping is + * calculated using {@link AffinityFunction} configured for cache. When late + * affinity assignment mode is disabled then new affinity mapping is applied immediately. + * <p> + * With late affinity assignment mode if primary node was changed for some partition, but data for this + * partition is not rebalanced yet on this node, then current primary is not changed and new primary is temporary + * assigned as backup. This nodes becomes primary only when rebalancing for all assigned primary partitions is + * finished. This mode can show better performance for cache operations, since when cache primary node + * executes some operation and data is not rebalanced yet, then it sends additional message to force rebalancing + * from other nodes. + * <p> + * Note, that {@link Affinity} interface provides assignment information taking into account late assignment, + * so while rebalancing for new primary nodes is not finished it can return assignment which differs + * from assignment calculated by {@link AffinityFunction#assignPartitions}. + * <p> + * This property should have the same value for all nodes in cluster. + * <p> + * If not provided, default value is {@link #DFLT_LATE_AFF_ASSIGNMENT}. + * + * @return Late affinity assignment flag. + * @see AffinityFunction + */ + public boolean isLateAffinityAssignment() { + return lateAffAssignment; + } + + /** + * Sets late affinity assignment flag. + * + * @param lateAffAssignment Late affinity assignment flag. + * @return {@code this} for chaining. + */ + public IgniteConfiguration setLateAffinityAssignment(boolean lateAffAssignment) { + this.lateAffAssignment = lateAffAssignment; + + return this; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(IgniteConfiguration.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index cec4b74..20795fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -191,6 +191,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_BUILD_VER; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CLIENT_MODE; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DAEMON; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LATE_AFFINITY_ASSIGNMENT; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_GRID_NAME; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IPS; @@ -1265,6 +1266,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { add(ATTR_MARSHALLER, cfg.getMarshaller().getClass().getName()); add(ATTR_MARSHALLER_USE_DFLT_SUID, getBoolean(IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID, OptimizedMarshaller.USE_DFLT_SUID)); + add(ATTR_LATE_AFFINITY_ASSIGNMENT, cfg.isLateAffinityAssignment()); if (cfg.getMarshaller() instanceof BinaryMarshaller) { add(ATTR_MARSHALLER_COMPACT_FOOTER, cfg.getBinaryConfiguration() == null ? http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java index da6f40d..3493eae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java @@ -150,6 +150,9 @@ public final class IgniteNodeAttributes { /** Binary configuration. */ public static final String ATTR_BINARY_CONFIGURATION = ATTR_PREFIX + ".binary.config"; + /** Late affinity assignment mode. */ + public static final String ATTR_LATE_AFFINITY_ASSIGNMENT = ATTR_PREFIX + ".cache.lateAffinity"; + /** * Enforces singleton. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java index 173bb6e..0997d6f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java @@ -222,7 +222,23 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern /** {@inheritDoc} */ @Override public int typeId() { - return BinaryPrimitives.readInt(arr, start + GridBinaryMarshaller.TYPE_ID_POS); + int off = start + GridBinaryMarshaller.TYPE_ID_POS; + + int typeId = BinaryPrimitives.readInt(arr, off); + + if (typeId == GridBinaryMarshaller.UNREGISTERED_TYPE_ID) { + off = start + GridBinaryMarshaller.DFLT_HDR_LEN; + + assert arr[off] == GridBinaryMarshaller.STRING : arr[off]; + + int len = BinaryPrimitives.readInt(arr, ++off); + + String clsName = new String(arr, off + 4, len, UTF_8); + + typeId = ctx.typeId(clsName); + } + + return typeId; } /** {@inheritDoc} */ @@ -236,13 +252,13 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Nullable @Override public <F> F field(String fieldName) throws BinaryObjectException { - return (F) reader(null).unmarshalField(fieldName); + return (F) reader(null, false).unmarshalField(fieldName); } /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Nullable @Override public <F> F field(int fieldId) throws BinaryObjectException { - return (F) reader(null).unmarshalField(fieldId); + return (F) reader(null, false).unmarshalField(fieldId); } /** {@inheritDoc} */ @@ -251,20 +267,20 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern Object val; // Calculate field position. - int schemaOffset = BinaryPrimitives.readInt(arr, start + GridBinaryMarshaller.SCHEMA_OR_RAW_OFF_POS); + int schemaOff = BinaryPrimitives.readInt(arr, start + GridBinaryMarshaller.SCHEMA_OR_RAW_OFF_POS); short flags = BinaryPrimitives.readShort(arr, start + GridBinaryMarshaller.FLAGS_POS); int fieldIdLen = BinaryUtils.isCompactFooter(flags) ? 0 : BinaryUtils.FIELD_ID_LEN; - int fieldOffsetLen = BinaryUtils.fieldOffsetLength(flags); + int fieldOffLen = BinaryUtils.fieldOffsetLength(flags); - int fieldOffsetPos = start + schemaOffset + order * (fieldIdLen + fieldOffsetLen) + fieldIdLen; + int fieldOffsetPos = start + schemaOff + order * (fieldIdLen + fieldOffLen) + fieldIdLen; int fieldPos; - if (fieldOffsetLen == BinaryUtils.OFFSET_1) + if (fieldOffLen == BinaryUtils.OFFSET_1) fieldPos = start + ((int)BinaryPrimitives.readByte(arr, fieldOffsetPos) & 0xFF); - else if (fieldOffsetLen == BinaryUtils.OFFSET_2) + else if (fieldOffLen == BinaryUtils.OFFSET_2) fieldPos = start + ((int)BinaryPrimitives.readShort(arr, fieldOffsetPos) & 0xFFFF); else fieldPos = start + BinaryPrimitives.readInt(arr, fieldOffsetPos); @@ -387,12 +403,12 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Nullable @Override protected <F> F field(BinaryReaderHandles rCtx, String fieldName) { - return (F)reader(rCtx).unmarshalField(fieldName); + return (F)reader(rCtx, false).unmarshalField(fieldName); } /** {@inheritDoc} */ @Override public boolean hasField(String fieldName) { - return reader(null).findFieldByName(fieldName); + return reader(null, false).findFieldByName(fieldName); } /** {@inheritDoc} */ @@ -423,7 +439,7 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern /** {@inheritDoc} */ @Override protected BinarySchema createSchema() { - return reader(null).getOrCreateSchema(); + return reader(null, false).getOrCreateSchema(); } /** {@inheritDoc} */ @@ -537,7 +553,7 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern */ private Object deserializeValue(@Nullable CacheObjectContext coCtx) { BinaryReaderExImpl reader = reader(null, - coCtx != null ? coCtx.kernalContext().config().getClassLoader() : ctx.configuration().getClassLoader()); + coCtx != null ? coCtx.kernalContext().config().getClassLoader() : ctx.configuration().getClassLoader(), true); Object obj0 = reader.deserialize(); @@ -563,25 +579,29 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern * Create new reader for this object. * * @param rCtx Reader context. + * @param ldr Class loader. + * @param forUnmarshal {@code True} if reader is need to unmarshal object. * @return Reader. */ - private BinaryReaderExImpl reader(@Nullable BinaryReaderHandles rCtx, @Nullable ClassLoader ldr) { + private BinaryReaderExImpl reader(@Nullable BinaryReaderHandles rCtx, @Nullable ClassLoader ldr, boolean forUnmarshal) { if (ldr == null) ldr = ctx.configuration().getClassLoader(); return new BinaryReaderExImpl(ctx, BinaryHeapInputStream.create(arr, start), ldr, - rCtx); + rCtx, + forUnmarshal); } /** * Create new reader for this object. * * @param rCtx Reader context. + * @param forUnmarshal {@code True} if reader is need to unmarshal object. * @return Reader. */ - private BinaryReaderExImpl reader(@Nullable BinaryReaderHandles rCtx) { - return reader(rCtx, null); + private BinaryReaderExImpl reader(@Nullable BinaryReaderHandles rCtx, boolean forUnmarshal) { + return reader(rCtx, null, forUnmarshal); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java index 27d3012..c687192 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java @@ -91,7 +91,17 @@ public class BinaryObjectOffheapImpl extends BinaryObjectExImpl implements Exter /** {@inheritDoc} */ @Override public int typeId() { - return BinaryPrimitives.readInt(ptr, start + GridBinaryMarshaller.TYPE_ID_POS); + int typeId = BinaryPrimitives.readInt(ptr, start + GridBinaryMarshaller.TYPE_ID_POS); + + if (typeId == GridBinaryMarshaller.UNREGISTERED_TYPE_ID) { + int off = start + GridBinaryMarshaller.DFLT_HDR_LEN; + + String clsName = BinaryUtils.doReadClassName(new BinaryOffheapInputStream(off, size)); + + typeId = ctx.typeId(clsName); + } + + return typeId; } /** {@inheritDoc} */ @@ -111,7 +121,7 @@ public class BinaryObjectOffheapImpl extends BinaryObjectExImpl implements Exter /** {@inheritDoc} */ @Override protected BinarySchema createSchema() { - return reader(null).getOrCreateSchema(); + return reader(null, false).getOrCreateSchema(); } /** {@inheritDoc} */ @@ -145,13 +155,13 @@ public class BinaryObjectOffheapImpl extends BinaryObjectExImpl implements Exter /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Nullable @Override public <F> F field(String fieldName) throws BinaryObjectException { - return (F) reader(null).unmarshalField(fieldName); + return (F) reader(null, false).unmarshalField(fieldName); } /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Nullable @Override public <F> F field(int fieldId) throws BinaryObjectException { - return (F) reader(null).unmarshalField(fieldId); + return (F) reader(null, false).unmarshalField(fieldId); } /** {@inheritDoc} */ @@ -160,20 +170,20 @@ public class BinaryObjectOffheapImpl extends BinaryObjectExImpl implements Exter Object val; // Calculate field position. - int schemaOffset = BinaryPrimitives.readInt(ptr, start + GridBinaryMarshaller.SCHEMA_OR_RAW_OFF_POS); + int schemaOff = BinaryPrimitives.readInt(ptr, start + GridBinaryMarshaller.SCHEMA_OR_RAW_OFF_POS); short flags = BinaryPrimitives.readShort(ptr, start + GridBinaryMarshaller.FLAGS_POS); int fieldIdLen = BinaryUtils.isCompactFooter(flags) ? 0 : BinaryUtils.FIELD_ID_LEN; - int fieldOffsetLen = BinaryUtils.fieldOffsetLength(flags); + int fieldOffLen = BinaryUtils.fieldOffsetLength(flags); - int fieldOffsetPos = start + schemaOffset + order * (fieldIdLen + fieldOffsetLen) + fieldIdLen; + int fieldOffsetPos = start + schemaOff + order * (fieldIdLen + fieldOffLen) + fieldIdLen; int fieldPos; - if (fieldOffsetLen == BinaryUtils.OFFSET_1) + if (fieldOffLen == BinaryUtils.OFFSET_1) fieldPos = start + ((int)BinaryPrimitives.readByte(ptr, fieldOffsetPos) & 0xFF); - else if (fieldOffsetLen == BinaryUtils.OFFSET_2) + else if (fieldOffLen == BinaryUtils.OFFSET_2) fieldPos = start + ((int)BinaryPrimitives.readShort(ptr, fieldOffsetPos) & 0xFFFF); else fieldPos = start + BinaryPrimitives.readInt(ptr, fieldOffsetPos); @@ -301,12 +311,12 @@ public class BinaryObjectOffheapImpl extends BinaryObjectExImpl implements Exter /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Nullable @Override protected <F> F field(BinaryReaderHandles rCtx, String fieldName) { - return (F)reader(rCtx).unmarshalField(fieldName); + return (F)reader(rCtx, false).unmarshalField(fieldName); } /** {@inheritDoc} */ @Override public boolean hasField(String fieldName) { - return reader(null).findFieldByName(fieldName); + return reader(null, false).findFieldByName(fieldName); } /** {@inheritDoc} */ @@ -401,16 +411,17 @@ public class BinaryObjectOffheapImpl extends BinaryObjectExImpl implements Exter * @return Deserialized value. */ private Object deserializeValue() { - return reader(null).deserialize(); + return reader(null, true).deserialize(); } /** * Create new reader for this object. * * @param rCtx Reader context. + * @param forUnmarshal {@code True} if reader is needed to unmarshal object. * @return Reader. */ - private BinaryReaderExImpl reader(@Nullable BinaryReaderHandles rCtx) { + private BinaryReaderExImpl reader(@Nullable BinaryReaderHandles rCtx, boolean forUnmarshal) { BinaryOffheapInputStream stream = new BinaryOffheapInputStream(ptr, size, false); stream.position(start); @@ -418,6 +429,7 @@ public class BinaryObjectOffheapImpl extends BinaryObjectExImpl implements Exter return new BinaryReaderExImpl(ctx, stream, ctx.configuration().getClassLoader(), - rCtx); + rCtx, + forUnmarshal); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java index f9e7aa5..69aecbf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java @@ -127,7 +127,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina private final int fieldIdLen; /** Offset size in bytes. */ - private final int fieldOffsetLen; + private final int fieldOffLen; /** Object schema. */ private final BinarySchema schema; @@ -147,9 +147,14 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina * @param ctx Context. * @param in Input stream. * @param ldr Class loader. + * @param forUnmarshal {@code True} if reader is needed to unmarshal object. */ - public BinaryReaderExImpl(BinaryContext ctx, BinaryInputStream in, ClassLoader ldr) { - this(ctx, in, ldr, null); + public BinaryReaderExImpl(BinaryContext ctx, BinaryInputStream in, ClassLoader ldr, boolean forUnmarshal) { + this(ctx, + in, + ldr, + null, + forUnmarshal); } /** @@ -159,10 +164,19 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina * @param in Input stream. * @param ldr Class loader. * @param hnds Context. + * @param forUnmarshal {@code True} if reader is need to unmarshal object. */ - public BinaryReaderExImpl(BinaryContext ctx, BinaryInputStream in, ClassLoader ldr, - @Nullable BinaryReaderHandles hnds) { - this(ctx, in, ldr, hnds, false); + public BinaryReaderExImpl(BinaryContext ctx, + BinaryInputStream in, + ClassLoader ldr, + @Nullable BinaryReaderHandles hnds, + boolean forUnmarshal) { + this(ctx, + in, + ldr, + hnds, + false, + forUnmarshal); } /** @@ -173,9 +187,14 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina * @param ldr Class loader. * @param hnds Context. * @param skipHdrCheck Whether to skip header check. - */ - public BinaryReaderExImpl(BinaryContext ctx, BinaryInputStream in, ClassLoader ldr, - @Nullable BinaryReaderHandles hnds, boolean skipHdrCheck) { + * @param forUnmarshal {@code True} if reader is need to unmarshal object. + */ + public BinaryReaderExImpl(BinaryContext ctx, + BinaryInputStream in, + ClassLoader ldr, + @Nullable BinaryReaderHandles hnds, + boolean skipHdrCheck, + boolean forUnmarshal) { // Initialize base members. this.ctx = ctx; this.in = in; @@ -202,7 +221,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina // Get trivial flag values. userType = BinaryUtils.isUserType(flags); fieldIdLen = BinaryUtils.fieldIdLength(flags); - fieldOffsetLen = BinaryUtils.fieldOffsetLength(flags); + fieldOffLen = BinaryUtils.fieldOffsetLength(flags); // Calculate footer borders and raw offset. if (BinaryUtils.hasSchema(flags)) { @@ -233,8 +252,12 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina if (typeId0 == UNREGISTERED_TYPE_ID) { int off = in.position(); - // Registers class by type ID, at least locally if the cache is not ready yet. - typeId = ctx.descriptorForClass(BinaryUtils.doReadClass(in, ctx, ldr, typeId0), false).typeId(); + if (forUnmarshal) { + // Registers class by type ID, at least locally if the cache is not ready yet. + typeId = ctx.descriptorForClass(BinaryUtils.doReadClass(in, ctx, ldr, typeId0), false).typeId(); + } + else + typeId = ctx.typeId(BinaryUtils.doReadClassName(in)); int clsNameLen = in.position() - off; @@ -259,7 +282,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina schemaId = 0; userType = false; fieldIdLen = 0; - fieldOffsetLen = 0; + fieldOffLen = 0; schema = null; } @@ -1640,6 +1663,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina } /** + * @param fieldId Field ID. * @return Deserialized object. * @throws BinaryObjectException If failed. */ @@ -1647,7 +1671,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina if (!findFieldById(fieldId)) return null; - return new BinaryReaderExImpl(ctx, in, ldr, hnds).deserialize(); + return new BinaryReaderExImpl(ctx, in, ldr, hnds, true).deserialize(); } /** @@ -1717,7 +1741,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina builder.addField(fieldId); - searchPos += BinaryUtils.FIELD_ID_LEN + fieldOffsetLen; + searchPos += BinaryUtils.FIELD_ID_LEN + fieldOffLen; } return builder.build(); @@ -1851,9 +1875,9 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina */ private boolean trySetUserFieldPosition(int order) { if (order != BinarySchema.ORDER_NOT_FOUND) { - int offsetPos = footerStart + order * (fieldIdLen + fieldOffsetLen) + fieldIdLen; + int offsetPos = footerStart + order * (fieldIdLen + fieldOffLen) + fieldIdLen; - int pos = start + BinaryUtils.fieldOffsetRelative(in, offsetPos, fieldOffsetLen); + int pos = start + BinaryUtils.fieldOffsetRelative(in, offsetPos, fieldOffLen); streamPosition(pos); @@ -1884,14 +1908,14 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina if (id0 == id) { int pos = start + BinaryUtils.fieldOffsetRelative(in, searchPos + BinaryUtils.FIELD_ID_LEN, - fieldOffsetLen); + fieldOffLen); streamPosition(pos); return true; } - searchPos += BinaryUtils.FIELD_ID_LEN + fieldOffsetLen; + searchPos += BinaryUtils.FIELD_ID_LEN + fieldOffLen; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryTypeImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryTypeImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryTypeImpl.java index d4fd625..132702c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryTypeImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryTypeImpl.java @@ -20,12 +20,15 @@ package org.apache.ignite.internal.binary; import org.apache.ignite.binary.BinaryType; import java.util.Collection; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.internal.S; /** * Binary type implementation. */ public class BinaryTypeImpl implements BinaryType { /** Binary context. */ + @GridToStringExclude private final BinaryContext ctx; /** Type metadata. */ @@ -90,4 +93,9 @@ public class BinaryTypeImpl implements BinaryType { public BinaryMetadata metadata() { return meta; } + + /** {@inheritDoc} */ + public String toString() { + return S.toString(BinaryTypeImpl.class, this); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java index 37f1d6a..c0202dd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java @@ -1390,7 +1390,7 @@ public class BinaryUtils { * @param in Input stream. * @return Class name. */ - private static String doReadClassName(BinaryInputStream in) { + public static String doReadClassName(BinaryInputStream in) { byte flag = in.readByte(); if (flag != GridBinaryMarshaller.STRING) @@ -1565,7 +1565,7 @@ public class BinaryUtils { */ @Nullable public static Object doReadObject(BinaryInputStream in, BinaryContext ctx, ClassLoader ldr, BinaryReaderHandlesHolder handles) throws BinaryObjectException { - return new BinaryReaderExImpl(ctx, in, ldr, handles.handles()).deserialize(); + return new BinaryReaderExImpl(ctx, in, ldr, handles.handles(), true).deserialize(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/binary/GridBinaryMarshaller.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/GridBinaryMarshaller.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/GridBinaryMarshaller.java index 67e741b..00d8871 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/GridBinaryMarshaller.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/GridBinaryMarshaller.java @@ -292,7 +292,7 @@ public class GridBinaryMarshaller { BinaryContext oldCtx = pushContext(ctx); try { - return (T)new BinaryReaderExImpl(ctx, BinaryHeapInputStream.create(arr, 0), ldr).deserialize(); + return (T)new BinaryReaderExImpl(ctx, BinaryHeapInputStream.create(arr, 0), ldr, true).deserialize(); } finally { popContext(oldCtx); @@ -340,7 +340,7 @@ public class GridBinaryMarshaller { public BinaryReaderExImpl reader(BinaryInputStream stream) { assert stream != null; - return new BinaryReaderExImpl(ctx, stream, null); + return new BinaryReaderExImpl(ctx, stream, null, true); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryBuilderReader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryBuilderReader.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryBuilderReader.java index 662ad1d..347fb2b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryBuilderReader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryBuilderReader.java @@ -55,7 +55,7 @@ public class BinaryBuilderReader implements BinaryPositionReadable { /** */ private int pos; - /* + /** * Constructor. * * @param objImpl Binary object @@ -67,7 +67,8 @@ public class BinaryBuilderReader implements BinaryPositionReadable { reader = new BinaryReaderExImpl(ctx, BinaryHeapInputStream.create(arr, pos), - ctx.configuration().getClassLoader()); + ctx.configuration().getClassLoader(), + false); objMap = new HashMap<>(); } @@ -83,7 +84,11 @@ public class BinaryBuilderReader implements BinaryPositionReadable { this.arr = other.arr; this.pos = start; - reader = new BinaryReaderExImpl(ctx, BinaryHeapInputStream.create(arr, start), null, other.reader.handles()); + reader = new BinaryReaderExImpl(ctx, + BinaryHeapInputStream.create(arr, start), + null, + other.reader.handles(), + false); this.objMap = other.objMap; } http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 3c7f378..47b1c5f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -26,13 +26,13 @@ import org.apache.ignite.internal.GridJobSiblingsRequest; import org.apache.ignite.internal.GridJobSiblingsResponse; import org.apache.ignite.internal.GridTaskCancelRequest; import org.apache.ignite.internal.GridTaskSessionRequest; +import org.apache.ignite.internal.binary.BinaryEnumObjectImpl; +import org.apache.ignite.internal.binary.BinaryObjectImpl; import org.apache.ignite.internal.managers.checkpoint.GridCheckpointRequest; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; import org.apache.ignite.internal.managers.deployment.GridDeploymentRequest; import org.apache.ignite.internal.managers.deployment.GridDeploymentResponse; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessage; -import org.apache.ignite.internal.binary.BinaryEnumObjectImpl; -import org.apache.ignite.internal.binary.BinaryObjectImpl; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection; import org.apache.ignite.internal.processors.cache.CacheEntryPredicateContainsValue; http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 5b56d38..7f02498 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -74,6 +74,7 @@ import org.apache.ignite.internal.managers.GridManagerAdapter; import org.apache.ignite.internal.managers.communication.GridIoManager; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics; import org.apache.ignite.internal.processors.security.SecurityContext; @@ -124,6 +125,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED; import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LATE_AFFINITY_ASSIGNMENT; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_USE_DFLT_SUID; @@ -508,9 +510,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { verChanged = true; } + + nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer); } + else { + nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer); - nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer); + ctx.cache().onDiscoveryEvent(type, node, nextTopVer); + } if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) { for (Class cls = customMsg.getClass(); cls != null; cls = cls.getSuperclass()) { @@ -996,6 +1003,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { Boolean locMarshUseDfltSuid = locNode.attribute(ATTR_MARSHALLER_USE_DFLT_SUID); boolean locMarshUseDfltSuidBool = locMarshUseDfltSuid == null ? true : locMarshUseDfltSuid; + boolean locDelayAssign = locNode.attribute(ATTR_LATE_AFFINITY_ASSIGNMENT); + for (ClusterNode n : nodes) { int rmtJvmMajVer = nodeJavaMajorVersion(n); @@ -1054,6 +1063,22 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ", rmtNodeAddrs=" + U.addressesAsString(n) + ", locNodeId=" + locNode.id() + ", rmtNodeId=" + n.id() + ']'); } + + boolean rmtLateAssign; + + if (n.version().compareTo(CacheAffinitySharedManager.LATE_AFF_ASSIGN_SINCE) >= 0) + rmtLateAssign = n.attribute(ATTR_LATE_AFFINITY_ASSIGNMENT); + else + rmtLateAssign = false; + + if (locDelayAssign != rmtLateAssign) { + throw new IgniteCheckedException("Remote node has cache affinity assignment mode different from local " + + "[locId8=" + U.id8(locNode.id()) + + ", locDelayAssign=" + locDelayAssign + + ", rmtId8=" + U.id8(n.id()) + + ", rmtLateAssign=" + rmtLateAssign + + ", rmtAddrs=" + U.addressesAsString(n) + ']'); + } } if (log.isDebugEnabled()) @@ -1456,7 +1481,36 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return Collection of cache nodes. */ public Collection<ClusterNode> nodes(long topVer) { - return resolveDiscoCache(null, new AffinityTopologyVersion(topVer)).allNodes(); + return nodes(new AffinityTopologyVersion(topVer)); + } + + /** + * Gets all nodes for given topology version. + * + * @param topVer Topology version. + * @return Collection of cache nodes. + */ + public Collection<ClusterNode> nodes(AffinityTopologyVersion topVer) { + return resolveDiscoCache(null, topVer).allNodes(); + } + + /** + * @param topVer Topology version. + * @return All server nodes for given topology version. + */ + public List<ClusterNode> serverNodes(AffinityTopologyVersion topVer) { + return resolveDiscoCache(null, topVer).srvNodes; + } + + /** + * Gets node from history for given topology version. + * + * @param topVer Topology version. + * @param id Node ID. + * @return Node. + */ + public ClusterNode node(AffinityTopologyVersion topVer, UUID id) { + return resolveDiscoCache(null, topVer).node(id); } /** @@ -2394,6 +2448,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** All nodes. */ private final List<ClusterNode> allNodes; + /** All server nodes. */ + private final List<ClusterNode> srvNodes; + /** All nodes with at least one cache configured. */ @GridToStringInclude private final Collection<ClusterNode> allNodesWithCaches; @@ -2500,8 +2557,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { Set<String> nearEnabledSet = new HashSet<>(); + List<ClusterNode> srvNodes = new ArrayList<>(); + for (ClusterNode node : allNodes) { assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node=" + node + ']'; + assert !node.isDaemon(); + + if (!CU.clientNode(node)) + srvNodes.add(node); if (node.order() > maxOrder0) maxOrder0 = node.order(); @@ -2568,6 +2631,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { nodes.add(node); } + Collections.sort(srvNodes, CU.nodeComparator(true)); + // Need second iteration to add this node to all previous node versions. for (ClusterNode node : allNodes) { IgniteProductVersion nodeVer = U.productVersion(node); @@ -2588,6 +2653,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { allNodesWithCaches = Collections.unmodifiableCollection(nodesWithCaches); this.rmtNodesWithCaches = Collections.unmodifiableCollection(rmtNodesWithCaches); nearEnabledCaches = Collections.unmodifiableSet(nearEnabledSet); + this.srvNodes = Collections.unmodifiableList(srvNodes); daemonNodes = Collections.unmodifiableList(new ArrayList<>( F.view(F.concat(false, loc, rmts), F0.not(FILTER_DAEMON)))); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java index 7b2bea3..92908cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java @@ -51,6 +51,12 @@ public class GridAffinityAssignment implements Serializable { /** Assignment node IDs */ private transient volatile List<HashSet<UUID>> assignmentIds; + /** Nodes having primary partitions assignments. */ + private transient volatile Set<ClusterNode> primaryPartsNodes; + + /** */ + private transient List<List<ClusterNode>> idealAssignment; + /** * Constructs cached affinity calculations item. * @@ -65,10 +71,18 @@ public class GridAffinityAssignment implements Serializable { /** * @param topVer Topology version. * @param assignment Assignment. + * @param idealAssignment Ideal assignment. */ - GridAffinityAssignment(AffinityTopologyVersion topVer, List<List<ClusterNode>> assignment) { + GridAffinityAssignment(AffinityTopologyVersion topVer, + List<List<ClusterNode>> assignment, + List<List<ClusterNode>> idealAssignment) { + assert topVer != null; + assert assignment != null; + assert idealAssignment != null; + this.topVer = topVer; this.assignment = assignment; + this.idealAssignment = idealAssignment; primary = new HashMap<>(); backup = new HashMap<>(); @@ -84,11 +98,19 @@ public class GridAffinityAssignment implements Serializable { this.topVer = topVer; assignment = aff.assignment; + idealAssignment = aff.idealAssignment; primary = aff.primary; backup = aff.backup; } /** + * @return Affinity assignment computed by affinity function. + */ + public List<List<ClusterNode>> idealAssignment() { + return idealAssignment; + } + + /** * @return Affinity assignment. */ public List<List<ClusterNode>> assignment() { @@ -146,6 +168,31 @@ public class GridAffinityAssignment implements Serializable { } /** + * @return Nodes having primary partitions assignments. + */ + @SuppressWarnings("ForLoopReplaceableByForEach") + public Set<ClusterNode> primaryPartitionNodes() { + Set<ClusterNode> primaryPartsNodes0 = primaryPartsNodes; + + if (primaryPartsNodes0 == null) { + int parts = assignment.size(); + + primaryPartsNodes0 = new HashSet<>(); + + for (int p = 0; p < parts; p++) { + List<ClusterNode> nodes = assignment.get(p); + + if (nodes.size() > 0) + primaryPartsNodes0.add(nodes.get(0)); + } + + primaryPartsNodes = primaryPartsNodes0; + } + + return primaryPartsNodes0; + } + + /** * Get primary partitions for specified node ID. * * @param nodeId Node ID to get primary partitions for. http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index 26e4d98..0cacf68 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -25,27 +25,28 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.affinity.AffinityCentralizedFunction; import org.apache.ignite.cache.affinity.AffinityFunction; -import org.apache.ignite.cache.affinity.AffinityKeyMapper; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridNodeOrderComparator; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.cache.CacheObject; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheDefaultAffinityKeyMapper; -import org.apache.ignite.internal.processors.cache.GridCacheInternal; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; -import org.jsr166.ConcurrentLinkedHashMap; import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT; @@ -56,32 +57,41 @@ public class GridAffinityAssignmentCache { /** Cache name. */ private final String cacheName; + /** */ + private final Integer cacheId; + /** Number of backups. */ - private int backups; + private final int backups; /** Affinity function. */ private final AffinityFunction aff; + /** */ + private final IgnitePredicate<ClusterNode> nodeFilter; + /** Partitions count. */ private final int partsCnt; - /** Affinity mapper function. */ - private final AffinityKeyMapper affMapper; - /** Affinity calculation results cache: topology version => partition => nodes. */ - private final ConcurrentLinkedHashMap<AffinityTopologyVersion, GridAffinityAssignment> affCache; + private final ConcurrentNavigableMap<AffinityTopologyVersion, GridAffinityAssignment> affCache; + + /** */ + private List<List<ClusterNode>> idealAssignment; /** Cache item corresponding to the head topology version. */ private final AtomicReference<GridAffinityAssignment> head; - /** Discovery manager. */ - private final GridCacheContext ctx; - /** Ready futures. */ private final ConcurrentMap<AffinityTopologyVersion, AffinityReadyFuture> readyFuts = new ConcurrentHashMap8<>(); /** Log. */ - private IgniteLogger log; + private final IgniteLogger log; + + /** */ + private final GridKernalContext ctx; + + /** */ + private final boolean locCache; /** Node stop flag. */ private volatile IgniteCheckedException stopErr; @@ -92,42 +102,62 @@ public class GridAffinityAssignmentCache { * @param ctx Kernal context. * @param cacheName Cache name. * @param aff Affinity function. - * @param affMapper Affinity key mapper. + * @param nodeFilter Node filter. * @param backups Number of backups. + * @param locCache Local cache flag. */ @SuppressWarnings("unchecked") - public GridAffinityAssignmentCache(GridCacheContext ctx, + public GridAffinityAssignmentCache(GridKernalContext ctx, String cacheName, AffinityFunction aff, - AffinityKeyMapper affMapper, - int backups) + IgnitePredicate<ClusterNode> nodeFilter, + int backups, + boolean locCache) { assert ctx != null; assert aff != null; - assert affMapper != null; this.ctx = ctx; this.aff = aff; - this.affMapper = affMapper; + this.nodeFilter = nodeFilter; this.cacheName = cacheName; this.backups = backups; + this.locCache = locCache; + + cacheId = CU.cacheId(cacheName); - log = ctx.logger(GridAffinityAssignmentCache.class); + log = ctx.log(GridAffinityAssignmentCache.class); partsCnt = aff.partitions(); - affCache = new ConcurrentLinkedHashMap<>(); + affCache = new ConcurrentSkipListMap<>(); head = new AtomicReference<>(new GridAffinityAssignment(AffinityTopologyVersion.NONE)); } /** - * Initializes affinity with given topology version and assignment. The assignment is calculated on remote nodes - * and brought to local node on partition map exchange. + * @return Cache name. + */ + public String cacheName() { + return cacheName; + } + + /** + * @return Cache ID. + */ + public Integer cacheId() { + return cacheId; + } + + /** + * Initializes affinity with given topology version and assignment. * * @param topVer Topology version. * @param affAssignment Affinity assignment for topology version. */ public void initialize(AffinityTopologyVersion topVer, List<List<ClusterNode>> affAssignment) { - GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, affAssignment); + assert topVer.compareTo(lastVersion()) >= 0 : "[topVer = " + topVer + ", last=" + lastVersion() + ']'; + assert idealAssignment != null; + + GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, affAssignment, idealAssignment); affCache.put(topVer, assignment); head.set(assignment); @@ -144,11 +174,32 @@ public class GridAffinityAssignmentCache { } /** + * @param assignment Assignment. + */ + public void idealAssignment(List<List<ClusterNode>> assignment) { + this.idealAssignment = assignment; + } + + /** + * @return Assignment. + */ + @Nullable public List<List<ClusterNode>> idealAssignment() { + return idealAssignment; + } + + /** + * @return {@code True} if affinity function has {@link AffinityCentralizedFunction} annotation. + */ + public boolean centralizedAffinityFunction() { + return U.hasAnnotation(aff, AffinityCentralizedFunction.class); + } + + /** * Kernal stop callback. * * @param err Error. */ - public void onKernalStop(IgniteCheckedException err) { + public void cancelFutures(IgniteCheckedException err) { stopErr = err; for (AffinityReadyFuture fut : readyFuts.values()) @@ -159,6 +210,8 @@ public class GridAffinityAssignmentCache { * */ public void onReconnected() { + idealAssignment = null; + affCache.clear(); head.set(new GridAffinityAssignment(AffinityTopologyVersion.NONE)); @@ -179,33 +232,23 @@ public class GridAffinityAssignmentCache { log.debug("Calculating affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId() + ", discoEvt=" + discoEvt + ']'); - Iterator<AffinityTopologyVersion> it = affCache.descendingKeySet().iterator(); - - AffinityTopologyVersion prevVer = null; - - if (it.hasNext()) - prevVer = it.next(); - - GridAffinityAssignment prev = prevVer == null ? null : affCache.get(prevVer); + List<List<ClusterNode>> prevAssignment = idealAssignment; + // Resolve nodes snapshot for specified topology version. List<ClusterNode> sorted; - if (ctx.isLocal()) - // For local cache always use local node. - sorted = Collections.singletonList(ctx.localNode()); - else { - // Resolve nodes snapshot for specified topology version. + if (!locCache) { sorted = new ArrayList<>(ctx.discovery().cacheAffinityNodes(cacheName, topVer)); Collections.sort(sorted, GridNodeOrderComparator.INSTANCE); } - - List<List<ClusterNode>> prevAssignment = prev == null ? null : prev.assignment(); + else + sorted = Collections.singletonList(ctx.discovery().localNode()); List<List<ClusterNode>> assignment; if (prevAssignment != null && discoEvt != null) { - boolean affNode = ctx.discovery().cacheAffinityNode(discoEvt.eventNode(), ctx.name()); + boolean affNode = CU.affinityNode(discoEvt.eventNode(), nodeFilter); if (!affNode) assignment = prevAssignment; @@ -219,32 +262,12 @@ public class GridAffinityAssignmentCache { assert assignment != null; - GridAffinityAssignment updated = new GridAffinityAssignment(topVer, assignment); - - updated = F.addIfAbsent(affCache, topVer, updated); - - // Update top version, if required. - while (true) { - GridAffinityAssignment headItem = head.get(); - - if (headItem.topologyVersion().compareTo(topVer) >= 0) - break; - - if (head.compareAndSet(headItem, updated)) - break; - } - - for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) { - if (entry.getKey().compareTo(topVer) <= 0) { - if (log.isDebugEnabled()) - log.debug("Completing topology ready future (calculated affinity) [locNodeId=" + ctx.localNodeId() + - ", futVer=" + entry.getKey() + ", topVer=" + topVer + ']'); + idealAssignment = assignment; - entry.getValue().onDone(topVer); - } - } + if (locCache) + initialize(topVer, assignment); - return updated.assignment(); + return assignment; } /** @@ -255,6 +278,8 @@ public class GridAffinityAssignmentCache { * @param topVer Topology version. */ public void clientEventTopologyChange(DiscoveryEvent evt, AffinityTopologyVersion topVer) { + assert topVer.compareTo(lastVersion()) >= 0 : "[topVer = " + topVer + ", last=" + lastVersion() + ']'; + GridAffinityAssignment aff = head.get(); assert evt.type() == EVT_DISCOVERY_CUSTOM_EVT || aff.primaryPartitions(evt.eventNode().id()).isEmpty() : evt; @@ -293,9 +318,10 @@ public class GridAffinityAssignmentCache { log.debug("Cleaning up cache for version [locNodeId=" + ctx.localNodeId() + ", topVer=" + topVer + ']'); - for (Iterator<AffinityTopologyVersion> it = affCache.keySet().iterator(); it.hasNext(); ) + for (Iterator<AffinityTopologyVersion> it = affCache.keySet().iterator(); it.hasNext(); ) { if (it.next().compareTo(topVer) < 0) it.remove(); + } } /** @@ -351,32 +377,6 @@ public class GridAffinityAssignmentCache { } /** - * NOTE: Use this method always when you need to calculate partition id for - * a key provided by user. It's required since we should apply affinity mapper - * logic in order to find a key that will eventually be passed to affinity function. - * - * @param key Key. - * @return Partition. - */ - public int partition(Object key) { - return aff.partition(affinityKey(key)); - } - - /** - * If Key is {@link GridCacheInternal GridCacheInternal} entry when won't passed into user's mapper and - * will use {@link GridCacheDefaultAffinityKeyMapper default}. - * - * @param key Key. - * @return Affinity key. - */ - private Object affinityKey(Object key) { - if (key instanceof CacheObject && !(key instanceof BinaryObject)) - key = ((CacheObject)key).value(ctx.cacheObjectContext(), false); - - return (key instanceof GridCacheInternal ? ctx.defaultAffMapper() : affMapper).affinityKey(key); - } - - /** * Gets affinity nodes for specified partition. * * @param part Partition. @@ -415,7 +415,7 @@ public class GridAffinityAssignmentCache { */ public void dumpDebugInfo() { if (!readyFuts.isEmpty()) { - U.warn(log, "Pending affinity ready futures [cache=" + cacheName + "]:"); + U.warn(log, "Pending affinity ready futures [cache=" + cacheName + ", lastVer=" + lastVersion() + "]:"); for (AffinityReadyFuture fut : readyFuts.values()) U.warn(log, ">>> " + fut); @@ -443,7 +443,7 @@ public class GridAffinityAssignmentCache { if (cache == null) { throw new IllegalStateException("Getting affinity for topology version earlier than affinity is " + - "calculated [locNodeId=" + ctx.localNodeId() + + "calculated [locNode=" + ctx.discovery().localNode() + ", cache=" + cacheName + ", topVer=" + topVer + ", head=" + head.get().topologyVersion() + @@ -458,6 +458,53 @@ public class GridAffinityAssignmentCache { } /** + * @param part Partition. + * @param startVer Start version. + * @param endVer End version. + * @return {@code True} if primary changed or required affinity version not found in history. + */ + public boolean primaryChanged(int part, AffinityTopologyVersion startVer, AffinityTopologyVersion endVer) { + GridAffinityAssignment aff = affCache.get(startVer); + + if (aff == null) + return false; + + List<ClusterNode> nodes = aff.get(part); + + if (nodes.isEmpty()) + return true; + + ClusterNode primary = nodes.get(0); + + for (GridAffinityAssignment assignment : affCache.tailMap(startVer, false).values()) { + List<ClusterNode> nodes0 = assignment.assignment().get(part); + + if (nodes0.isEmpty()) + return true; + + if (!nodes0.get(0).equals(primary)) + return true; + + if (assignment.topologyVersion().equals(endVer)) + return false; + } + + return true; + } + + /** + * @param aff Affinity cache. + */ + public void init(GridAffinityAssignmentCache aff) { + assert aff.lastVersion().compareTo(lastVersion()) >= 0; + assert aff.idealAssignment() != null; + + idealAssignment(aff.idealAssignment()); + + initialize(aff.lastVersion(), aff.assignments(aff.lastVersion())); + } + + /** * @param topVer Topology version to wait. */ private void awaitTopologyVersion(AffinityTopologyVersion topVer) { @@ -511,5 +558,10 @@ public class GridAffinityAssignmentCache { return done; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(AffinityReadyFuture.class, this); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java index 8a0194c..6b289e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java @@ -320,7 +320,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { AffinityInfo info = new AffinityInfo( cctx.config().getAffinity(), cctx.config().getAffinityMapper(), - new GridAffinityAssignment(topVer, cctx.affinity().assignments(topVer)), + new GridAffinityAssignment(topVer, cctx.affinity().assignment(topVer)), cctx.cacheObjectContext()); IgniteInternalFuture<AffinityInfo> old = affMap.putIfAbsent(key, new GridFinishedFuture<>(info)); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java index f670960..2952ebc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java @@ -150,6 +150,7 @@ class GridAffinityUtils { /** * @param cacheName Cache name. + * @param topVer Topology version. */ private AffinityJob(@Nullable String cacheName, @NotNull AffinityTopologyVersion topVer) { this.cacheName = cacheName; @@ -182,7 +183,7 @@ class GridAffinityUtils { return F.t( affinityMessage(ctx, cctx.config().getAffinity()), affinityMessage(ctx, cctx.config().getAffinityMapper()), - new GridAffinityAssignment(topVer, cctx.affinity().assignments(topVer))); + new GridAffinityAssignment(topVer, cctx.affinity().assignment(topVer))); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java new file mode 100644 index 0000000..8cff65e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class CacheAffinityChangeMessage implements DiscoveryCustomMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private IgniteUuid id = IgniteUuid.randomUuid(); + + /** */ + private AffinityTopologyVersion topVer; + + /** */ + private GridDhtPartitionExchangeId exchId; + + /** */ + private Map<Integer, Map<Integer, List<UUID>>> assignmentChange; + + /** */ + private Map<Integer, IgniteUuid> cacheDeploymentIds; + + /** */ + private GridDhtPartitionsFullMessage partsMsg; + + /** */ + private transient boolean exchangeNeeded; + + /** + * Constructor used when message is created after cache rebalance finished. + * + * @param topVer Topology version. + * @param assignmentChange Assignment change. + * @param cacheDeploymentIds Cache deployment ID. + */ + public CacheAffinityChangeMessage(AffinityTopologyVersion topVer, + Map<Integer, Map<Integer, List<UUID>>> assignmentChange, + Map<Integer, IgniteUuid> cacheDeploymentIds) { + assert !F.isEmpty(assignmentChange) : assignmentChange; + + this.topVer = topVer; + this.assignmentChange = assignmentChange; + this.cacheDeploymentIds = cacheDeploymentIds; + } + + /** + * Constructor used when message is created to finish exchange. + * + * @param exchId Exchange ID. + * @param partsMsg Partitions messages. + * @param assignmentChange Assignment change. + */ + public CacheAffinityChangeMessage(GridDhtPartitionExchangeId exchId, + GridDhtPartitionsFullMessage partsMsg, + Map<Integer, Map<Integer, List<UUID>>> assignmentChange) { + this.exchId = exchId; + this.partsMsg = partsMsg; + this.assignmentChange = assignmentChange; + } + + /** + * @return Cache deployment IDs. + */ + public Map<Integer, IgniteUuid> cacheDeploymentIds() { + return cacheDeploymentIds; + } + + /** + * @return {@code True} if request should trigger partition exchange. + */ + public boolean exchangeNeeded() { + return exchangeNeeded; + } + + /** + * @param exchangeNeeded {@code True} if request should trigger partition exchange. + */ + public void exchangeNeeded(boolean exchangeNeeded) { + this.exchangeNeeded = exchangeNeeded; + } + + /** + * @return Partitions message. + */ + public GridDhtPartitionsFullMessage partitionsMessage() { + return partsMsg; + } + + /** + * @return Affinity assignments. + */ + @Nullable public Map<Integer, Map<Integer, List<UUID>>> assignmentChange() { + return assignmentChange; + } + + /** + * @return Exchange version. + */ + @Nullable public GridDhtPartitionExchangeId exchangeId() { + return exchId; + } + + /** + * @return Topology version. + */ + public AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid id() { + return id; + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoveryCustomMessage ackMessage() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return false; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheAffinityChangeMessage.class, this); + } +}
