1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4f083613 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4f083613 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4f083613 Branch: refs/heads/ignite-1093-2 Commit: 4f0836132ad99cf859c70e58e5c7bc1aa3ee7481 Parents: de424c1 8616889 Author: Anton Vinogradov <[email protected]> Authored: Mon Oct 5 18:58:52 2015 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Mon Oct 5 18:58:52 2015 +0300 ---------------------------------------------------------------------- RELEASE_NOTES.txt | 2 + assembly/dependencies-fabric.xml | 2 +- assembly/release-fabric.xml | 10 +- examples/schema-import/pom-standalone.xml | 90 ++++ examples/schema-import/pom.xml | 13 +- .../computegrid/ComputeClosureExample.java | 2 +- modules/apache-license-gen/README.txt | 33 ++ .../TcpDiscoveryCloudIpFinderSelfTest.java | 2 + .../org/apache/ignite/IgniteFileSystem.java | 2 + .../java/org/apache/ignite/igfs/IgfsPath.java | 2 +- .../apache/ignite/internal/IgniteKernal.java | 70 +-- .../managers/discovery/CustomEventListener.java | 4 +- .../discovery/GridDiscoveryManager.java | 28 +- .../cache/DynamicCacheChangeRequest.java | 19 + .../cache/DynamicCacheDescriptor.java | 36 ++ .../processors/cache/GridCacheContext.java | 2 +- .../processors/cache/GridCacheEventManager.java | 12 +- .../cache/GridCacheExplicitLockSpan.java | 13 +- .../processors/cache/GridCacheIoManager.java | 76 +--- .../processors/cache/GridCacheMessage.java | 7 - .../cache/GridCacheMvccCandidate.java | 5 +- .../processors/cache/GridCacheMvccManager.java | 67 +-- .../GridCachePartitionExchangeManager.java | 86 ++-- .../processors/cache/GridCacheProcessor.java | 109 +++-- .../cache/GridCacheSharedContext.java | 32 -- .../processors/cache/GridCacheUtils.java | 8 - .../distributed/GridDistributedCacheEntry.java | 2 +- .../dht/GridDhtAffinityAssignmentRequest.java | 5 - .../dht/GridDhtAffinityAssignmentResponse.java | 5 - .../cache/distributed/dht/GridDhtGetFuture.java | 4 +- .../distributed/dht/GridDhtLockRequest.java | 5 - .../distributed/dht/GridDhtTxFinishRequest.java | 5 - .../dht/GridDhtTxPrepareRequest.java | 7 +- .../dht/GridPartitionedGetFuture.java | 5 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 9 +- .../dht/colocated/GridDhtColocatedCache.java | 12 +- .../colocated/GridDhtColocatedLockFuture.java | 16 +- .../dht/preloader/GridDhtForceKeysRequest.java | 5 - .../dht/preloader/GridDhtForceKeysResponse.java | 5 - .../GridDhtPartitionDemandMessage.java | 5 - .../GridDhtPartitionSupplyMessage.java | 5 - .../GridDhtPartitionsAbstractMessage.java | 5 - .../dht/preloader/GridDhtPreloader.java | 13 +- .../distributed/near/GridNearGetFuture.java | 2 + .../cache/distributed/near/GridNearTxLocal.java | 2 +- .../cache/local/GridLocalCacheEntry.java | 2 +- .../continuous/CacheContinuousQueryHandler.java | 10 +- .../continuous/GridContinuousProcessor.java | 17 +- .../datastructures/DataStructuresProcessor.java | 6 +- .../internal/processors/igfs/IgfsFileInfo.java | 15 +- .../internal/processors/igfs/IgfsImpl.java | 121 +----- .../processors/igfs/IgfsMetaManager.java | 427 ++++++++++++++++--- .../ignite/internal/util/GridArgumentCheck.java | 5 +- .../util/nio/GridNioRecoveryDescriptor.java | 4 +- .../ignite/igfs/IgfsFragmentizerSelfTest.java | 2 +- .../IgniteClientReconnectAbstractTest.java | 35 +- .../IgniteClientReconnectCacheTest.java | 154 +++++++ .../GridDiscoveryManagerAliveCacheSelfTest.java | 2 + .../processors/cache/CrossCacheLockTest.java | 142 ++++++ .../GridCacheAbstractFailoverSelfTest.java | 3 + .../GridCacheAbstractRemoveFailureTest.java | 3 + .../GridCacheFinishPartitionsSelfTest.java | 5 +- ...IgniteCacheAtomicPutAllFailoverSelfTest.java | 4 + .../cache/IgniteCacheCreateRestartSelfTest.java | 3 + .../cache/IgniteCacheEntryListenerTxTest.java | 4 + .../IgniteCacheP2pUnmarshallingErrorTest.java | 7 + ...CacheP2pUnmarshallingRebalanceErrorTest.java | 11 +- .../cache/IgniteCachePutAllRestartTest.java | 4 +- ...omicOffheapQueueCreateMultiNodeSelfTest.java | 5 - ...ionedAtomicQueueCreateMultiNodeSelfTest.java | 9 +- ...artitionedOffHeapValuesQueueApiSelfTest.java | 4 + ...PartitionedQueueCreateMultiNodeSelfTest.java | 16 +- ...GridCachePartitionedSetFailoverSelfTest.java | 4 + ...acheAsyncOperationsFailoverAbstractTest.java | 11 + .../CacheGetFutureHangsSelfTest.java | 156 ++++--- .../CachePutAllFailoverAbstractTest.java | 11 + .../IgniteCacheAtomicNodeRestartTest.java | 8 + .../IgniteCacheCreatePutMultiNodeSelfTest.java | 151 +++++++ .../distributed/IgniteCacheCreatePutTest.java | 125 ++++++ .../IgniteCachePutGetRestartAbstractTest.java | 3 + .../IgniteCacheSizeFailoverTest.java | 3 + .../dht/GridNearCacheTxNodeFailureSelfTest.java | 4 + ...gniteAtomicLongChangingTopologySelfTest.java | 29 +- .../IgniteCacheCrossCacheTxFailoverTest.java | 3 + .../IgniteCrossCacheTxNearEnabledSelfTest.java | 28 ++ .../dht/IgniteCrossCacheTxSelfTest.java | 213 +++++++++ ...ledFairAffinityMultiNodeFullApiSelfTest.java | 4 + .../near/GridCacheNearTxExceptionSelfTest.java | 4 + .../DataStreamerMultiThreadedSelfTest.java | 4 +- .../DataStreamerMultinodeCreateCacheTest.java | 2 + .../processors/igfs/IgfsAbstractSelfTest.java | 203 ++++++--- .../igfs/IgfsMetaManagerSelfTest.java | 6 - .../processors/igfs/IgfsMetricsSelfTest.java | 2 +- .../processors/igfs/IgfsProcessorSelfTest.java | 29 +- .../igfs/UniversalFileSystemAdapter.java | 1 - .../IgniteCacheFailoverTestSuite3.java | 5 +- .../testsuites/IgniteCacheTestSuite4.java | 12 + .../processors/hadoop/igfs/HadoopIgfsUtils.java | 36 ++ ...oopFileSystemUniversalFileSystemAdapter.java | 4 +- .../HadoopIgfs20FileSystemAbstractSelfTest.java | 7 +- .../IgniteHadoopFileSystemAbstractSelfTest.java | 5 +- .../processors/query/h2/sql/GridSqlArray.java | 52 +++ .../processors/query/h2/sql/GridSqlElement.java | 2 +- .../query/h2/sql/GridSqlFunction.java | 60 ++- .../query/h2/sql/GridSqlFunctionType.java | 3 + .../query/h2/sql/GridSqlPlaceholder.java | 7 +- .../query/h2/sql/GridSqlQueryParser.java | 84 ++-- .../processors/query/h2/sql/GridSqlType.java | 29 +- .../query/h2/sql/GridQueryParsingTest.java | 27 ++ pom.xml | 9 +- 110 files changed, 2358 insertions(+), 857 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4f083613/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4f083613/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4f083613/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 01e366e,adc2174..e52dd3a --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@@ -318,41 -307,10 +311,38 @@@ public class GridCachePartitionExchange if (reconnect) reconnectExchangeFut = new GridFutureAdapter<>(); + exchWorker.futQ.addFirst(fut); + + if (!cctx.kernalContext().clientNode()) { + + for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) { + final int idx = cnt; + + cctx.io().addOrderedHandler(rebalanceTopic(cnt), new CI2<UUID, GridCacheMessage>() { + @Override public void apply(final UUID id, final GridCacheMessage m) { + if (!enterBusy()) + return; + + try { + if (m instanceof GridDhtPartitionSupplyMessageV2) + cctx.cacheContext(m.cacheId).preloader().handleSupplyMessage( + idx, id, (GridDhtPartitionSupplyMessageV2)m); + else if (m instanceof GridDhtPartitionDemandMessage) + cctx.cacheContext(m.cacheId).preloader().handleDemandMessage( + idx, id, (GridDhtPartitionDemandMessage)m); + else + log.error("Unsupported message type: " + m.getClass().getName()); + } + finally { + leaveBusy(); + } + } + }); + } + } + new IgniteThread(cctx.gridName(), "exchange-worker", exchWorker).start(); - onDiscoveryEvent(cctx.localNodeId(), fut); - - // Allow discovery events to get processed. - locExchFut.onDone(); - if (reconnect) { fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { http://git-wip-us.apache.org/repos/asf/ignite/blob/4f083613/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4f083613/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4f083613/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index ec9b8e7,19b461e..8521fe0 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@@ -243,133 -235,42 +243,144 @@@ public class GridDhtPreloader extends G /** {@inheritDoc} */ @Override public void onInitialExchangeComplete(@Nullable Throwable err) { - if (err == null) { + if (err == null) startFut.onDone(); + else + startFut.onDone(err); + } + + /** {@inheritDoc} */ + @Override public void onReconnected() { + startFut = new GridFutureAdapter<>(); + } + + /** {@inheritDoc} */ + @Override public void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) { + demander.updateLastExchangeFuture(lastFut); + } + + /** {@inheritDoc} */ + @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) { + // No assignments for disabled preloader. + GridDhtPartitionTopology top = cctx.dht().topology(); + + if (!cctx.rebalanceEnabled()) + return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion()); + + int partCnt = cctx.affinity().partitions(); - final long start = U.currentTimeMillis(); + assert exchFut.forcePreload() || exchFut.dummyReassign() || + exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) : + "Topology version mismatch [exchId=" + exchFut.exchangeId() + + ", topVer=" + top.topologyVersion() + ']'; - final CacheConfiguration cfg = cctx.config(); + GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion()); - if (cfg.getRebalanceDelay() >= 0 && !cctx.kernalContext().clientNode()) { - U.log(log, "Starting rebalancing in " + cfg.getRebalanceMode() + " mode: " + cctx.name()); + AffinityTopologyVersion topVer = assigns.topologyVersion(); - demandPool.syncFuture().listen(new CI1<Object>() { - @Override public void apply(Object t) { - U.log(log, "Completed rebalancing in " + cfg.getRebalanceMode() + " mode " + - "[cache=" + cctx.name() + ", time=" + (U.currentTimeMillis() - start) + " ms]"); + for (int p = 0; p < partCnt; p++) { + if (cctx.shared().exchange().hasPendingExchange()) { + if (log.isDebugEnabled()) + log.debug("Skipping assignments creation, exchange worker has pending assignments: " + + exchFut.exchangeId()); + + break; + } + + // If partition belongs to local node. + if (cctx.affinity().localNode(p, topVer)) { + GridDhtLocalPartition part = top.localPartition(p, topVer, true); + + assert part != null; + assert part.id() == p; + + if (part.state() != MOVING) { + if (log.isDebugEnabled()) + log.debug("Skipping partition assignment (state is not MOVING): " + part); + + continue; // For. + } + + Collection<ClusterNode> picked = pickedOwners(p, topVer); + + if (picked.isEmpty()) { + top.own(part); + + if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) { + DiscoveryEvent discoEvt = exchFut.discoveryEvent(); + + cctx.events().addPreloadEvent(p, + EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(), + discoEvt.type(), discoEvt.timestamp()); } - }); + + if (log.isDebugEnabled()) + log.debug("Owning partition as there are no other owners: " + part); + } + else { + ClusterNode n = F.first(picked); + + GridDhtPartitionDemandMessage msg = assigns.get(n); + + if (msg == null) { + assigns.put(n, msg = new GridDhtPartitionDemandMessage( + top.updateSequence(), + exchFut.exchangeId().topologyVersion(), + cctx.cacheId())); + } + + msg.addPartition(p); + } } } - else - startFut.onDone(err); + + return assigns; } + /** {@inheritDoc} */ + @Override public void onReconnected() { + startFut = new GridFutureAdapter<>(); + + long topVer0 = cctx.kernalContext().discovery().topologyVersion(); + + assert topVer0 > 0 : topVer0; + + topVer.set(topVer0); + } + - /** {@inheritDoc} */ - @Override public void onExchangeFutureAdded() { - demandPool.onExchangeFutureAdded(); + /** + * @param p Partition. + * @param topVer Topology version. + * @return Picked owners. + */ + private Collection<ClusterNode> pickedOwners(int p, AffinityTopologyVersion topVer) { + Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer); + + int affCnt = affNodes.size(); + + Collection<ClusterNode> rmts = remoteOwners(p, topVer); + + int rmtCnt = rmts.size(); + + if (rmtCnt <= affCnt) + return rmts; + + List<ClusterNode> sorted = new ArrayList<>(rmts); + + // Sort in descending order, so nodes with higher order will be first. + Collections.sort(sorted, CU.nodeComparator(false)); + + // Pick newest nodes. + return sorted.subList(0, affCnt); + } + + /** + * @param p Partition. + * @param topVer Topology version. + * @return Nodes owning this partition. + */ + private Collection<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) { + return F.view(cctx.dht().topology().owners(p, topVer), F.remoteNodes(cctx.nodeId())); } /** {@inheritDoc} */
