http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index d746d54,546be37..00a4b2e --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@@ -507,10 -482,10 +490,10 @@@ public abstract class GridCacheQueryMan * @throws IgniteCheckedException Thrown in case of any errors. */ @SuppressWarnings("SimplifiableIfStatement") - public void remove(CacheObject key, CacheObject val) throws IgniteCheckedException { + public void remove(KeyCacheObject key, int partId, CacheObject val, GridCacheVersion ver) throws IgniteCheckedException { assert key != null; - if (!GridQueryProcessor.isEnabled(cctx.config()) && !(key instanceof GridCacheInternal)) + if (!QueryUtils.isEnabled(cctx.config()) && !(key instanceof GridCacheInternal)) return; // No-op. if (!enterBusy())
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index f334b84,dc4e52f..a59ff51 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@@ -36,12 -30,8 +30,12 @@@ import javax.cache.expiry.Duration import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; + import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.IgniteInternalFuture; - import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.pagemem.wal.StorageException; +import org.apache.ignite.internal.pagemem.wal.WALPointer; +import org.apache.ignite.internal.pagemem.wal.record.DataEntry; +import org.apache.ignite.internal.pagemem.wal.record.DataRecord; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheInvokeEntry; @@@ -1142,304 -963,165 +995,163 @@@ public abstract class IgniteTxLocalAdap /** * @param cacheCtx Cache context. - * @param keys Key to enlist. - * @param expiryPlc Explicitly specified expiry policy for entry. - * @param map Return map. - * @param missed Map of missed keys. - * @param keysCnt Keys count (to avoid call to {@code Collection.size()}). - * @param deserializeBinary Deserialize binary flag. - * @param skipVals Skip values flag. - * @param keepCacheObjects Keep cache objects flag. - * @param skipStore Skip store flag. - * @throws IgniteCheckedException If failed. - * @return Enlisted keys. + * @param keys Keys. + * @return Expiry policy. */ - @SuppressWarnings({"RedundantTypeArguments"}) - private <K, V> Collection<KeyCacheObject> enlistRead( - final GridCacheContext cacheCtx, - @Nullable AffinityTopologyVersion entryTopVer, - Collection<KeyCacheObject> keys, - @Nullable ExpiryPolicy expiryPlc, - Map<K, V> map, - Map<KeyCacheObject, GridCacheVersion> missed, - int keysCnt, - boolean deserializeBinary, - boolean skipVals, - boolean keepCacheObjects, - boolean skipStore, - final boolean needVer, - final boolean recovery - ) throws IgniteCheckedException { - assert !F.isEmpty(keys); - assert keysCnt == keys.size(); - - cacheCtx.checkSecurity(SecurityPermission.CACHE_READ); - - boolean single = keysCnt == 1; - - Collection<KeyCacheObject> lockKeys = null; - - AffinityTopologyVersion topVer = entryTopVer != null ? entryTopVer : topologyVersion(); - - boolean needReadVer = (serializable() && optimistic()) || needVer; - - // In this loop we cover only read-committed or optimistic transactions. - // Transactions that are pessimistic and not read-committed are covered - // outside of this loop. - for (KeyCacheObject key : keys) { - if ((pessimistic() || needReadVer) && !readCommitted() && !skipVals) - addActiveCache(cacheCtx, recovery); - - IgniteTxKey txKey = cacheCtx.txKey(key); - - // Check write map (always check writes first). - IgniteTxEntry txEntry = entry(txKey); - - // Either non-read-committed or there was a previous write. - if (txEntry != null) { - CacheObject val = txEntry.value(); - - if (txEntry.hasValue()) { - if (!F.isEmpty(txEntry.entryProcessors())) - val = txEntry.applyEntryProcessors(val); - - if (val != null) { - GridCacheVersion ver = null; + protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext cacheCtx, Collection<KeyCacheObject> keys) { + return null; + } - if (needVer) { - if (txEntry.op() != READ) - ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_UPDATED; - else { - ver = txEntry.entryReadVersion(); + /** + * Post lock processing for put or remove. + * + * @param cacheCtx Context. + * @param keys Keys. + * @param ret Return value. + * @param rmv {@code True} if remove. + * @param retval Flag to return value or not. + * @param read {@code True} if read. + * @param accessTtl TTL for read operation. + * @param filter Filter to check entries. + * @throws IgniteCheckedException If error. + * @param computeInvoke If {@code true} computes return value for invoke operation. + */ + @SuppressWarnings("unchecked") + protected final void postLockWrite( + GridCacheContext cacheCtx, + Iterable<KeyCacheObject> keys, + GridCacheReturn ret, + boolean rmv, + boolean retval, + boolean read, + long accessTtl, + CacheEntryPredicate[] filter, + boolean computeInvoke + ) throws IgniteCheckedException { + for (KeyCacheObject k : keys) { + IgniteTxEntry txEntry = entry(cacheCtx.txKey(k)); - if (ver == null && pessimistic()) { - while (true) { - try { - GridCacheEntryEx cached = txEntry.cached(); + if (txEntry == null) + throw new IgniteCheckedException("Transaction entry is null (most likely collection of keys passed into cache " + + "operation was changed before operation completed) [missingKey=" + k + ", tx=" + this + ']'); - ver = cached.isNear() ? - ((GridNearCacheEntry)cached).dhtVersion() : cached.version(); + while (true) { + GridCacheEntryEx cached = txEntry.cached(); - break; - } - catch (GridCacheEntryRemovedException ignored) { - txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer)); - } - } - } + try { + assert cached.detached() || cached.lockedByThread(threadId) || isRollbackOnly() : + "Transaction lock is not acquired [entry=" + cached + ", tx=" + this + + ", nodeId=" + cctx.localNodeId() + ", threadId=" + threadId + ']'; - if (ver == null) { - assert optimistic() && repeatableRead() : this; + if (log.isDebugEnabled()) + log.debug("Post lock write entry: " + cached); - ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_AFTER_GET; - } - } + CacheObject v = txEntry.previousValue(); + boolean hasPrevVal = txEntry.hasPreviousValue(); - assert ver != null; - } + if (onePhaseCommit()) + filter = txEntry.filters(); - cacheCtx.addResult(map, key, val, skipVals, keepCacheObjects, deserializeBinary, false, - ver, 0, 0); - } - } - else { - assert txEntry.op() == TRANSFORM; + // If we have user-passed filter, we must read value into entry for peek(). + if (!F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter)) + retval = true; - while (true) { - try { - GridCacheVersion readVer = null; - EntryGetResult getRes = null; + boolean invoke = txEntry.op() == TRANSFORM; - Object transformClo = - (txEntry.op() == TRANSFORM && - cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ? - F.first(txEntry.entryProcessors()) : null; + if (retval || invoke) { + if (!cacheCtx.isNear()) { + if (!hasPrevVal) { + // For non-local cache should read from store after lock on primary. + boolean readThrough = cacheCtx.isLocal() && + (invoke || cacheCtx.loadPreviousValue()) && + !txEntry.skipStore(); - if (needVer) { - getRes = txEntry.cached().innerGetVersioned( + v = cached.innerGet( null, this, - /*update-metrics*/true, - /*event*/!skipVals, - /*swap*/true, + readThrough, + /*metrics*/!invoke, + /*event*/!invoke && !dht(), - /*temporary*/false, CU.subjectId(this, cctx), - transformClo, - resolveTaskName(), - null, - txEntry.keepBinary(), - null); - - if (getRes != null) { - val = getRes.value(); - readVer = getRes.version(); - } - } - else { - val = txEntry.cached().innerGet( null, - this, - /*read-through*/false, - /*metrics*/true, - /*event*/!skipVals, - CU.subjectId(this, cctx), - transformClo, resolveTaskName(), null, txEntry.keepBinary()); } - - if (val != null) { - if (!readCommitted() && !skipVals) - txEntry.readValue(val); - - if (!F.isEmpty(txEntry.entryProcessors())) - val = txEntry.applyEntryProcessors(val); - - cacheCtx.addResult(map, - key, - val, - skipVals, - keepCacheObjects, - deserializeBinary, - false, - getRes, - readVer, - 0, - 0, - needVer); - } - else - missed.put(key, txEntry.cached().version()); - - break; } - catch (GridCacheEntryRemovedException ignored) { - txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer)); + else { + if (!hasPrevVal) - v = cached.rawGetOrUnmarshal(false); ++ v = cached.rawGet(); } - } - } - } - // First time access within transaction. - else { - if (lockKeys == null && !skipVals) - lockKeys = single ? Collections.singleton(key) : new ArrayList<KeyCacheObject>(keysCnt); - if (!single && !skipVals) - lockKeys.add(key); + if (txEntry.op() == TRANSFORM) { + if (computeInvoke) { + GridCacheVersion ver; + + try { + ver = cached.version(); + } + catch (GridCacheEntryRemovedException e) { + assert optimistic() : txEntry; - while (true) { - GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topVer); + if (log.isDebugEnabled()) + log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']'); - try { - GridCacheVersion ver = entry.version(); - - CacheObject val = null; - GridCacheVersion readVer = null; - EntryGetResult getRes = null; - - if (!pessimistic() || readCommitted() && !skipVals) { - IgniteCacheExpiryPolicy accessPlc = - optimistic() ? accessPolicy(cacheCtx, txKey, expiryPlc) : null; - - if (needReadVer) { - getRes = primaryLocal(entry) ? - entry.innerGetVersioned( - null, - this, - /*metrics*/true, - /*event*/true, - CU.subjectId(this, cctx), - null, - resolveTaskName(), - accessPlc, - !deserializeBinary, - null) : null; - - if (getRes != null) { - val = getRes.value(); - readVer = getRes.version(); + ver = null; } - } - else { - val = entry.innerGet( - null, - this, - /*no read-through*/false, - /*metrics*/true, - /*event*/true, - CU.subjectId(this, cctx), - null, - resolveTaskName(), - accessPlc, - !deserializeBinary); - } - if (val != null) { - cacheCtx.addResult(map, - key, - val, - skipVals, - keepCacheObjects, - deserializeBinary, - false, - getRes, - readVer, - 0, - 0, - needVer); + addInvokeResult(txEntry, v, ret, ver); } - else - missed.put(key, ver); } else - // We must wait for the lock in pessimistic mode. - missed.put(key, ver); - - if (!readCommitted() && !skipVals) { - txEntry = addEntry(READ, - val, - null, - null, - entry, - expiryPlc, - null, - true, - -1L, - -1L, - null, - skipStore, - !deserializeBinary); - - // As optimization, mark as checked immediately - // for non-pessimistic if value is not null. - if (val != null && !pessimistic()) { - txEntry.markValid(); - - if (needReadVer) { - assert readVer != null; - - txEntry.entryReadVersion(readVer); - } - } - } - - break; // While. + ret.value(cacheCtx, v, txEntry.keepBinary()); } - catch (GridCacheEntryRemovedException ignored) { + + boolean pass = F.isEmpty(filter) || cacheCtx.isAll(cached, filter); + + // For remove operation we return true only if we are removing s/t, + // i.e. cached value is not null. + ret.success(pass && (!retval ? !rmv || cached.hasValue() || v != null : !rmv || v != null)); + + if (onePhaseCommit()) + txEntry.filtersPassed(pass); + + boolean updateTtl = read; + + if (pass) { + txEntry.markValid(); + if (log.isDebugEnabled()) - log.debug("Got removed entry in transaction getAllAsync(..) (will retry): " + key); + log.debug("Filter passed in post lock for key: " + k); } - finally { - if (entry != null && readCommitted()) { - if (cacheCtx.isNear()) { - if (cacheCtx.affinity().partitionBelongs(cacheCtx.localNode(), entry.partition(), topVer)) { - if (entry.markObsolete(xidVer)) - cacheCtx.cache().removeEntry(entry); - } - } - else - entry.context().evicts().touch(entry, topVer); + else { + // Revert operation to previous. (if no - NOOP, so entry will be unlocked). + txEntry.setAndMarkValid(txEntry.previousOperation(), cacheCtx.toCacheObject(ret.value())); + txEntry.filters(CU.empty0()); + txEntry.filtersSet(false); + + updateTtl = !cacheCtx.putIfAbsentFilter(filter); + } + + if (updateTtl) { + if (!read) { + ExpiryPolicy expiryPlc = cacheCtx.expiryForTxEntry(txEntry); + + if (expiryPlc != null) + txEntry.ttl(CU.toTtl(expiryPlc.getExpiryForAccess())); } + else + txEntry.ttl(accessTtl); } + + break; // While. + } + // If entry cached within transaction got removed before lock. + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry in putAllAsync method (will retry): " + cached); + + txEntry.cached(entryEx(cached.context(), txEntry.txKey(), topologyVersion())); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java index eb3524b,e0549fb..4726d86 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java @@@ -35,10 -35,10 +35,10 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.cache.CacheObjectImpl; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDefaultAffinityKeyMapper; +import org.apache.ignite.internal.processors.cache.IncompleteCacheObject; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; - import org.apache.ignite.internal.processors.query.GridQueryProcessor; + import org.apache.ignite.internal.processors.query.QueryUtils; -import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; @@@ -160,13 -174,13 +160,13 @@@ public class IgniteCacheObjectProcessor } /** {@inheritDoc} */ - @Override public CacheObject toCacheObject(CacheObjectContext ctx, byte type, byte[] bytes) { + @Override public KeyCacheObject toKeyCacheObject(CacheObjectContext ctx, byte type, byte[] bytes) throws IgniteCheckedException { switch (type) { case CacheObject.TYPE_BYTE_ARR: - return new CacheObjectByteArrayImpl(bytes); + throw new IllegalArgumentException("Byte arrays cannot be used as cache keys."); case CacheObject.TYPE_REGULAR: - return new KeyCacheObjectImpl(ctx.processor().unmarshal(ctx, bytes, null), bytes); - return new CacheObjectImpl(null, bytes); ++ return new KeyCacheObjectImpl(ctx.processor().unmarshal(ctx, bytes, null), bytes, -1); } throw new IllegalArgumentException("Invalid object type: " + type); @@@ -299,8 -242,10 +299,8 @@@ @Override public CacheObjectContext contextForCache(CacheConfiguration ccfg) throws IgniteCheckedException { assert ccfg != null; - CacheMemoryMode memMode = ccfg.getMemoryMode(); - boolean storeVal = !ccfg.isCopyOnRead() || (!isBinaryEnabled(ccfg) && - (GridQueryProcessor.isEnabled(ccfg) || ctx.config().isPeerClassLoadingEnabled())); + (QueryUtils.isEnabled(ccfg) || ctx.config().isPeerClassLoadingEnabled())); CacheObjectContext res = new CacheObjectContext(ctx, ccfg.getName(), @@@ -421,18 -362,10 +417,18 @@@ Object val = ctx.processor().unmarshal(ctx, valBytes, ldr); - KeyCacheObjectImpl key = new KeyCacheObjectImpl(val, valBytes); - return new KeyCacheObjectImpl(val, valBytes, partition()); ++ KeyCacheObject key = new KeyCacheObjectImpl(val, valBytes, partition()); + + key.partition(partition()); + + return key; } - KeyCacheObjectImpl key = new KeyCacheObjectImpl(val, valBytes); - return new KeyCacheObjectImpl(val, valBytes, partition()); ++ KeyCacheObject key = new KeyCacheObjectImpl(val, valBytes, partition()); + + key.partition(partition()); + + return key; } catch (IgniteCheckedException e) { throw new IgniteException("Failed to marshal object: " + val, e); http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java index 124cb4b,cfe94b2..075fb06 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java @@@ -121,11 -139,30 +139,30 @@@ public class ClusterProcessor extends G } } + + /** + * @param vals collection to seek through. + */ + private Boolean findLastFlag(Collection<Serializable> vals) { + Boolean flag = null; + + for (Serializable ser : vals) { + if (ser != null) { + Map<String, Object> map = (Map<String, Object>) ser; + + if (map.containsKey(ATTR_UPDATE_NOTIFIER_STATUS)) + flag = (Boolean) map.get(ATTR_UPDATE_NOTIFIER_STATUS); + } + } + + return flag; + } + /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { + @Override public void onKernalStart(boolean activeOnStart) throws IgniteCheckedException { if (notifyEnabled.get()) { try { - verChecker = new GridUpdateNotifier(ctx.gridName(), + verChecker = new GridUpdateNotifier(ctx.igniteInstanceName(), VER_STR, ctx.gateway(), ctx.plugins().allProviders(), http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java index 87d54a1,0000000..e5919e0 mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java @@@ -1,947 -1,0 +1,944 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cluster; + - import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteCompute; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.events.Event; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.cluster.ClusterGroupAdapter; +import org.apache.ignite.internal.managers.discovery.CustomEventListener; +import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; +import org.apache.ignite.internal.processors.GridProcessorAdapter; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; - import org.apache.ignite.internal.processors.cache.ClusterState; +import org.apache.ignite.internal.processors.cache.ChangeGlobalStateMessage; ++import org.apache.ignite.internal.processors.cache.ClusterState; +import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; +import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest; +import org.apache.ignite.internal.processors.cache.GridCacheProcessor; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.GridChangeGlobalStateMessageResponse; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.CI2; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteRunnable; +import org.apache.ignite.resources.IgniteInstanceResource; ++import org.apache.ignite.spi.discovery.DiscoveryDataBag; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; +import static org.apache.ignite.internal.processors.cache.ClusterState.ACTIVE; +import static org.apache.ignite.internal.processors.cache.ClusterState.INACTIVE; +import static org.apache.ignite.internal.processors.cache.ClusterState.TRANSITION; + +/** + * + */ +public class GridClusterStateProcessor extends GridProcessorAdapter { + /** Global status. */ + private volatile ClusterState globalState; + + /** Action context. */ + private volatile ChangeGlobalStateContext lastCgsCtx; + + /** Local action future. */ + private final AtomicReference<GridChangeGlobalStateFuture> cgsLocFut = new AtomicReference<>(); + + /** Process. */ + @GridToStringExclude + private GridCacheProcessor cacheProc; + + /** Shared context. */ + @GridToStringExclude + private GridCacheSharedContext<?, ?> sharedCtx; + + //todo may be add init latch + + /** Listener. */ + private final GridLocalEventListener lsr = new GridLocalEventListener() { + @Override public void onEvent(Event evt) { + assert evt != null; + + final DiscoveryEvent e = (DiscoveryEvent)evt; + + assert e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED : this; + + final GridChangeGlobalStateFuture f = cgsLocFut.get(); + + if (f != null) + f.initFut.listen(new CI1<IgniteInternalFuture>() { + @Override public void apply(IgniteInternalFuture fut) { + f.onDiscoveryEvent(e); + } + }); + } + }; + + /** + * @param ctx Kernal context. + */ + public GridClusterStateProcessor(GridKernalContext ctx) { + super(ctx); + } + + /** {@inheritDoc} */ + @Override public void start(boolean activeOnStart) throws IgniteCheckedException { + super.start(activeOnStart); + + globalState = activeOnStart ? ACTIVE : INACTIVE; + cacheProc = ctx.cache(); + sharedCtx = cacheProc.context(); + + sharedCtx.io().addHandler(0, + GridChangeGlobalStateMessageResponse.class, + new CI2<UUID, GridChangeGlobalStateMessageResponse>() { + @Override public void apply(UUID nodeId, GridChangeGlobalStateMessageResponse msg) { + processChangeGlobalStateResponse(nodeId, msg); + } + }); + + ctx.discovery().setCustomEventListener( + ChangeGlobalStateMessage.class, new CustomEventListener<ChangeGlobalStateMessage>() { + @Override public void onCustomEvent( + AffinityTopologyVersion topVer, ClusterNode snd, ChangeGlobalStateMessage msg) { + assert topVer != null; + assert snd != null; + assert msg != null; + + boolean activate = msg.activate(); + + ChangeGlobalStateContext actx = lastCgsCtx; + + if (actx != null && globalState == TRANSITION) { + GridChangeGlobalStateFuture f = cgsLocFut.get(); + + if (log.isDebugEnabled()) + log.debug("Concurrent " + prettyStr(activate) + " [id=" + + ctx.localNodeId() + " topVer=" + topVer + " actx=" + actx + ", msg=" + msg + "]"); + + if (f != null && f.requestId.equals(msg.requestId())) + f.onDone(new IgniteCheckedException( + "Concurrent change state, now in progress=" + (activate) + + ", initiatingNodeId=" + actx.initiatingNodeId + + ", you try=" + (prettyStr(activate)) + ", locNodeId=" + ctx.localNodeId() + )); + + msg.concurrentChangeState(); + } + else { + if (log.isInfoEnabled()) + log.info("Create " + prettyStr(activate) + " context [id=" + + ctx.localNodeId() + " topVer=" + topVer + ", reqId=" + + msg.requestId() + ", initiatingNodeId=" + msg.initiatorNodeId() + "]"); + + lastCgsCtx = new ChangeGlobalStateContext( + msg.requestId(), + msg.initiatorNodeId(), + msg.getDynamicCacheChangeBatch(), + msg.activate()); + + globalState = TRANSITION; + } + } + }); + + ctx.event().addLocalEventListener(lsr, EVT_NODE_LEFT, EVT_NODE_FAILED); + } + + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) throws IgniteCheckedException { + super.stop(cancel); + + sharedCtx.io().removeHandler(0, GridChangeGlobalStateMessageResponse.class); + ctx.event().removeLocalEventListener(lsr, EVT_NODE_LEFT, EVT_NODE_FAILED); + + IgniteCheckedException stopErr = new IgniteInterruptedCheckedException( - "Node is stopping: " + ctx.gridName()); ++ "Node is stopping: " + ctx.igniteInstanceName()); + + GridChangeGlobalStateFuture f = cgsLocFut.get(); + + if (f != null) + f.onDone(stopErr); + + cgsLocFut.set(null); + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() { + return DiscoveryDataExchangeType.STATE_PROC; + } + + /** {@inheritDoc} */ - @Nullable @Override public Serializable collectDiscoveryData(UUID nodeId) { - return globalState; ++ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) { ++ dataBag.addGridCommonData(DiscoveryDataExchangeType.STATE_PROC.ordinal(), globalState); + } + + /** {@inheritDoc} */ - @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable data) { - if (ctx.localNodeId().equals(joiningNodeId)) - globalState = (ClusterState)data; ++ @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { ++ globalState = (ClusterState)data.commonData(); + } + + /** + * + */ + public IgniteInternalFuture<?> changeGlobalState(final boolean activate) { + if (cacheProc.transactions().tx() != null || sharedCtx.lockedTopologyVersion(null) != null) + throw new IgniteException("Cannot " + prettyStr(activate) + " cluster, because cache locked on transaction."); + - if ((this.globalState == ACTIVE && activate) || (this.globalState == INACTIVE && !activate)) ++ if ((globalState == ACTIVE && activate) || (this.globalState == INACTIVE && !activate)) + return new GridFinishedFuture<>(); + + final UUID requestId = UUID.randomUUID(); + + final GridChangeGlobalStateFuture cgsFut = new GridChangeGlobalStateFuture(requestId, activate, ctx); + + if (!cgsLocFut.compareAndSet(null, cgsFut)) { + GridChangeGlobalStateFuture locF = cgsLocFut.get(); + + if (locF.activate == activate) + return locF; + else + return new GridFinishedFuture<>(new IgniteException( + "fail " + prettyStr(activate) + ", because now in progress" + prettyStr(locF.activate))); + } + + try { + if (ctx.clientNode()) { + AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx(); + + IgniteCompute comp = ((ClusterGroupAdapter)ctx.cluster().get().forServers()) + .compute().withAsync(); + + if (log.isInfoEnabled()) + log.info("Send " + prettyStr(activate) + " request from client node [id=" + + ctx.localNodeId() + " topVer=" + topVer + " ]"); + + comp.run(new ClientChangeGlobalStateComputeRequest(activate)); + + comp.future().listen(new CI1<IgniteFuture>() { + @Override public void apply(IgniteFuture fut) { + try { + fut.get(); + + cgsFut.onDone(); + } + catch (Exception e) { + cgsFut.onDone(e); + } + } + }); + } + else { + List<DynamicCacheChangeRequest> reqs = new ArrayList<>(); + + DynamicCacheChangeRequest changeGlobalStateReq = new DynamicCacheChangeRequest( + requestId, null, ctx.localNodeId()); + + changeGlobalStateReq.state(activate ? ACTIVE : INACTIVE); + + reqs.add(changeGlobalStateReq); + + reqs.addAll(activate ? cacheProc.startAllCachesRequests() : cacheProc.stopAllCachesRequests()); + + ChangeGlobalStateMessage changeGlobalStateMsg = new ChangeGlobalStateMessage( + requestId, ctx.localNodeId(), activate, new DynamicCacheChangeBatch(reqs)); + + try { + ctx.discovery().sendCustomEvent(changeGlobalStateMsg); + + if (ctx.isStopping()) + cgsFut.onDone(new IgniteCheckedException("Failed to execute " + prettyStr(activate) + " request, " + + "node is stopping.")); + } + catch (IgniteCheckedException e) { + log.error("Fail create or send change global state request." + cgsFut, e); + + cgsFut.onDone(e); + } + } + } + catch (IgniteCheckedException e) { + log.error("Fail create or send change global state request." + cgsFut, e); + + cgsFut.onDone(e); + } + + return cgsFut; + } + + /** + * + */ + public boolean active() { + ChangeGlobalStateContext actx = lastCgsCtx; + + if (actx != null && !actx.activate && globalState == TRANSITION) + return true; + + if (actx != null && actx.activate && globalState == TRANSITION) + return false; + + return globalState == ACTIVE; + } + + /** + * @param reqs Requests. + */ + public boolean changeGlobalState( + Collection<DynamicCacheChangeRequest> reqs, + AffinityTopologyVersion topVer + ) { + assert !F.isEmpty(reqs); + assert topVer != null; + + for (DynamicCacheChangeRequest req : reqs) + if (req.globalStateChange()) { + ChangeGlobalStateContext cgsCtx = lastCgsCtx; + + assert cgsCtx != null : "reqs: " + Arrays.toString(reqs.toArray()); + + cgsCtx.topologyVersion(topVer); + + return true; + } + + + return false; + } + + /** + * Invoke from exchange future. + */ + public Exception onChangeGlobalState() { + GridChangeGlobalStateFuture f = cgsLocFut.get(); + + ChangeGlobalStateContext cgsCtx = lastCgsCtx; + + assert cgsCtx != null; + + if (f != null) + f.setRemaining(cgsCtx.topVer); + + return cgsCtx.activate ? onActivate(cgsCtx) : onDeActivate(cgsCtx); + } + + /** + * @param exs Exs. + */ + public void onFullResponseMessage(Map<UUID, Exception> exs) { + assert !F.isEmpty(exs); + + ChangeGlobalStateContext actx = lastCgsCtx; + + actx.setFail(); + + // revert change if activation request fail + if (actx.activate) { + try { + cacheProc.onKernalStopCaches(true); + + cacheProc.stopCaches(true); + + sharedCtx.affinity().removeAllCacheInfo(); + + if (!ctx.clientNode()) { + sharedCtx.database().onDeActivate(ctx); + + if (sharedCtx.pageStore() != null) + sharedCtx.pageStore().onDeActivate(ctx); + + if (sharedCtx.wal() != null) + sharedCtx.wal().onDeActivate(ctx); + } + } + catch (Exception e) { + for (Map.Entry<UUID, Exception> entry : exs.entrySet()) + e.addSuppressed(entry.getValue()); + + log.error("Fail while revert activation request changes", e); + } + } + else { + //todo revert change if deactivate request fail + } + + globalState = actx.activate ? INACTIVE : ACTIVE; + + GridChangeGlobalStateFuture af = cgsLocFut.get(); + + if (af != null && af.requestId.equals(actx.requestId)) { + IgniteCheckedException e = new IgniteCheckedException("see suppressed"); + + for (Map.Entry<UUID, Exception> entry : exs.entrySet()) + e.addSuppressed(entry.getValue()); + + af.onDone(e); + } + } + + /** + * + */ + private Exception onActivate(ChangeGlobalStateContext cgsCtx) { + final boolean client = ctx.clientNode(); + + if (log.isInfoEnabled()) + log.info("Start activation process [nodeId=" + this.ctx.localNodeId() + ", client=" + client + + ", topVer=" + cgsCtx.topVer + "]"); + + Collection<CacheConfiguration> cfgs = new ArrayList<>(); + + for (DynamicCacheChangeRequest req : cgsCtx.batch.requests()) { + if (req.startCacheConfiguration() != null) + cfgs.add(req.startCacheConfiguration()); + } + + try { + if (!client) { + sharedCtx.database().lock(); + + IgnitePageStoreManager pageStore = sharedCtx.pageStore(); + + if (pageStore != null) + pageStore.onActivate(ctx); + + if (sharedCtx.wal() != null) + sharedCtx.wal().onActivate(ctx); + + sharedCtx.database().initDataBase(); + + for (CacheConfiguration cfg : cfgs) { + if (CU.isSystemCache(cfg.getName())) + if (pageStore != null) + pageStore.initializeForCache(cfg); + } + + for (CacheConfiguration cfg : cfgs) { + if (!CU.isSystemCache(cfg.getName())) + if (pageStore != null) + pageStore.initializeForCache(cfg); + } + + sharedCtx.database().onActivate(ctx); + } + + if (log.isInfoEnabled()) + log.info("Success activate wal, dataBase, pageStore [nodeId=" + + ctx.localNodeId() + ", client=" + client + ", topVer=" + cgsCtx.topVer + "]"); + + return null; + } + catch (Exception e) { + log.error("Fail activate wal, dataBase, pageStore [nodeId=" + ctx.localNodeId() + ", client=" + client + + ", topVer=" + cgsCtx.topVer + "]", e); + + if (!ctx.clientNode()) + sharedCtx.database().unLock(); + + return e; + } + } + + /** + * + */ + public Exception onDeActivate(ChangeGlobalStateContext cgsCtx) { + final boolean client = ctx.clientNode(); + + if (log.isInfoEnabled()) + log.info("Start deactivate process [id=" + ctx.localNodeId() + ", client=" + + client + ", topVer=" + cgsCtx.topVer + "]"); + + try { + ctx.dataStructures().onDeActivate(ctx); + + ctx.service().onDeActivate(ctx); + + if (log.isInfoEnabled()) + log.info("Success deactivate services, dataStructures, database, pageStore, wal [id=" + ctx.localNodeId() + ", client=" + + client + ", topVer=" + cgsCtx.topVer + "]"); + + return null; + } + catch (Exception e) { + log.error("DeActivation fail [nodeId=" + ctx.localNodeId() + ", client=" + client + + ", topVer=" + cgsCtx.topVer + "]", e); + + return e; + } + finally { + if (!client) + sharedCtx.database().unLock(); + } + } + + /** + * + */ + private void onFinalActivate(final ChangeGlobalStateContext cgsCtx) { + IgniteInternalFuture<?> asyncActivateFut = ctx.closure().runLocalSafe(new Runnable() { + @Override public void run() { + boolean client = ctx.clientNode(); + + Exception e = null; + + try { - ctx.marshallerContext().onMarshallerCacheStarted(ctx); - + if (!ctx.config().isDaemon()) + ctx.cacheObjects().onUtilityCacheStarted(); + + ctx.service().onUtilityCacheStarted(); + + ctx.service().onActivate(ctx); + + ctx.dataStructures().onActivate(ctx); + + if (log.isInfoEnabled()) + log.info("Success final activate [nodeId=" + + ctx.localNodeId() + ", client=" + client + ", topVer=" + cgsCtx.topVer + "]"); + } + catch (Exception ex) { + e = ex; + + log.error("Fail activate finished [nodeId=" + ctx.localNodeId() + ", client=" + client + + ", topVer=" + GridClusterStateProcessor.this.lastCgsCtx.topVer + "]", ex); + } + finally { + globalState = ACTIVE; + + sendChangeGlobalStateResponse(cgsCtx.requestId, cgsCtx.initiatingNodeId, e); + + GridClusterStateProcessor.this.lastCgsCtx = null; + } + } + }); + + cgsCtx.setAsyncActivateFut(asyncActivateFut); + } + + /** + * + */ + public void onFinalDeActivate(ChangeGlobalStateContext cgsCtx) { + final boolean client = ctx.clientNode(); + + if (log.isInfoEnabled()) + log.info("Success final deactivate [nodeId=" + + ctx.localNodeId() + ", client=" + client + ", topVer=" + cgsCtx.topVer + "]"); + + Exception ex = null; + + try { + if (!client) { + sharedCtx.database().onDeActivate(ctx); + + if (sharedCtx.pageStore() != null) + sharedCtx.pageStore().onDeActivate(ctx); + + if (sharedCtx.wal() != null) + sharedCtx.wal().onDeActivate(ctx); + + sharedCtx.affinity().removeAllCacheInfo(); + } + } + catch (Exception e) { + ex = e; + } + finally { + globalState = INACTIVE; + } + + sendChangeGlobalStateResponse(cgsCtx.requestId, cgsCtx.initiatingNodeId, ex); + + this.lastCgsCtx = null; + } + + /** + * + */ + public void onExchangeDone() { + ChangeGlobalStateContext cgsCtx = lastCgsCtx; + + assert cgsCtx != null; + + if (!cgsCtx.isFail()) { + if (cgsCtx.activate) + onFinalActivate(cgsCtx); + else + onFinalDeActivate(cgsCtx); + } + else + lastCgsCtx = null; + } + + /** + * @param initNodeId Initialize node id. + * @param ex Exception. + */ + private void sendChangeGlobalStateResponse(UUID requestId, UUID initNodeId, Exception ex) { + assert requestId != null; + assert initNodeId != null; + + try { + GridChangeGlobalStateMessageResponse actResp = new GridChangeGlobalStateMessageResponse(requestId, ex); + + if (log.isDebugEnabled()) + log.debug("Send change global state response [nodeId=" + ctx.localNodeId() + + ", topVer=" + ctx.discovery().topologyVersionEx() + ", response=" + actResp + "]"); + + if (ctx.localNodeId().equals(initNodeId)) + processChangeGlobalStateResponse(ctx.localNodeId(), actResp); + else + sharedCtx.io().send(initNodeId, actResp, SYSTEM_POOL); + } + catch (IgniteCheckedException e) { + log.error("Fail send change global state response to " + initNodeId, e); + } + } + + /** + * @param msg Message. + */ + private void processChangeGlobalStateResponse(final UUID nodeId, final GridChangeGlobalStateMessageResponse msg) { + assert nodeId != null; + assert msg != null; + + if (log.isDebugEnabled()) + log.debug("Received activation response [requestId=" + msg.getRequestId() + + ", nodeId=" + nodeId + "]"); + + ClusterNode node = ctx.discovery().node(nodeId); + + if (node == null) { + U.warn(log, "Received activation response from unknown node (will ignore) [requestId=" + + msg.getRequestId() + ']'); + + return; + } + + UUID requestId = msg.getRequestId(); + + final GridChangeGlobalStateFuture fut = cgsLocFut.get(); + + if (fut != null && !fut.isDone() && requestId.equals(fut.requestId)) { + fut.initFut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { + fut.onResponse(nodeId, msg); + } + }); + } + } + + + + /** + * @param activate Activate. + */ + private String prettyStr(boolean activate) { + return activate ? "activate" : "deactivate"; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridClusterStateProcessor.class, this); + } + + /** + * + */ + private static class GridChangeGlobalStateFuture extends GridFutureAdapter { + /** Request id. */ + @GridToStringInclude + private final UUID requestId; + + /** Activate. */ + private final boolean activate; + + /** Nodes. */ + @GridToStringInclude + private final Set<UUID> remaining = new HashSet<>(); + + /** Responses. */ + @GridToStringInclude + private final Map<UUID, GridChangeGlobalStateMessageResponse> resps = new HashMap<>(); + + /** Context. */ + @GridToStringExclude + private final GridKernalContext ctx; + + /** */ + @GridToStringExclude + private final Object mux = new Object(); + + /** */ + @GridToStringInclude + private final GridFutureAdapter initFut = new GridFutureAdapter(); + + /** Grid logger. */ + @GridToStringExclude + private final IgniteLogger log; + + /** + * + */ + public GridChangeGlobalStateFuture(UUID requestId, boolean activate, GridKernalContext ctx) { + this.requestId = requestId; + this.activate = activate; + this.ctx = ctx; + this.log = ctx.log(getClass()); + } + + /** + * @param event Event. + */ + public void onDiscoveryEvent(DiscoveryEvent event) { + assert event != null; + + if (isDone()) + return; + + boolean allReceived = false; + + synchronized (mux) { + if (remaining.remove(event.eventNode().id())) + allReceived = remaining.isEmpty(); + } + + if (allReceived) + onAllReceived(); + } + + /** + * + */ + public void setRemaining(AffinityTopologyVersion topVer) { + Collection<ClusterNode> nodes = ctx.discovery().nodes(topVer); + + List<UUID> ids = new ArrayList<>(nodes.size()); + + for (ClusterNode n : nodes) + ids.add(n.id()); + + if (log.isDebugEnabled()) + log.debug("Setup remaining node [id=" + ctx.localNodeId() + ", client=" + + ctx.clientNode() + ", topVer=" + ctx.discovery().topologyVersionEx() + + ", nodes=" + Arrays.toString(ids.toArray()) + "]"); + + synchronized (mux) { + remaining.addAll(ids); + } + + initFut.onDone(); + } + + /** + * @param msg Activation message response. + */ + public void onResponse(UUID nodeId, GridChangeGlobalStateMessageResponse msg) { + assert msg != null; + + if (isDone()) + return; + + boolean allReceived = false; + + synchronized (mux) { + if (remaining.remove(nodeId)) + allReceived = remaining.isEmpty(); + + resps.put(nodeId, msg); + } + + if (allReceived) + onAllReceived(); + } + + /** + * + */ + private void onAllReceived() { + Throwable e = new Throwable(); + + boolean fail = false; + + for (Map.Entry<UUID, GridChangeGlobalStateMessageResponse> entry : resps.entrySet()) { + GridChangeGlobalStateMessageResponse r = entry.getValue(); + + if (r.getError() != null) { + fail = true; + + e.addSuppressed(r.getError()); + } + } + + if (fail) + onDone(e); + else + onDone(); + } + + /** {@inheritDoc} */ + @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) { + ctx.state().cgsLocFut.set(null); + + return super.onDone(res, err); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridChangeGlobalStateFuture.class, this); + } + } + + /** + * + * + */ + private static class ChangeGlobalStateContext { + /** Request id. */ + private final UUID requestId; + + /** Initiating node id. */ + private final UUID initiatingNodeId; + + /** Batch requests. */ + private final DynamicCacheChangeBatch batch; + + /** Activate. */ + private final boolean activate; + + /** Topology version. */ + private AffinityTopologyVersion topVer; + + /** Fail. */ + private boolean fail; + + /** Async activate future. */ + private IgniteInternalFuture<?> asyncActivateFut; + + /** + * + */ + public ChangeGlobalStateContext( + UUID requestId, + UUID initiatingNodeId, + DynamicCacheChangeBatch batch, + boolean activate + ) { + this.requestId = requestId; + this.batch = batch; + this.activate = activate; + this.initiatingNodeId = initiatingNodeId; + } + + /** + * @param topVer Topology version. + */ + public void topologyVersion(AffinityTopologyVersion topVer) { + this.topVer = topVer; + } + + /** + * + */ + private void setFail() { + fail = true; + } + + /** + * + */ + private boolean isFail() { + return fail; + } + + /** + * + */ + public IgniteInternalFuture<?> getAsyncActivateFut() { + return asyncActivateFut; + } + + /** + * @param asyncActivateFut Async activate future. + */ + public void setAsyncActivateFut(IgniteInternalFuture<?> asyncActivateFut) { + this.asyncActivateFut = asyncActivateFut; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ChangeGlobalStateContext.class, this); + } + } + + /** + * + */ + private static class ClientChangeGlobalStateComputeRequest implements IgniteRunnable { + /** */ + private static final long serialVersionUID = 0L; + + /** Activation. */ + private final boolean activation; + + /** Ignite. */ + @IgniteInstanceResource + private Ignite ignite; + + /** + * + */ + private ClientChangeGlobalStateComputeRequest(boolean activation) { + this.activation = activation; + } + + /** {@inheritDoc} */ + @Override public void run() { + ignite.active(activation); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java index 5e15b46,3178e92..57ae7c6 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java @@@ -52,10 -39,18 +49,8 @@@ import org.apache.ignite.lang.IgniteClo import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentMap; - import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK; import static org.apache.ignite.IgniteSystemProperties.getBoolean; - import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; - import static org.apache.ignite.igfs.IgfsMode.PROXY; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGFS; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/GridJobMetricsProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java index 0000000,66c19a0..6d59f89 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java @@@ -1,0 -1,363 +1,363 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.marshaller; + + import java.util.Iterator; + import java.util.List; + import java.util.Map; + import java.util.UUID; + import java.util.concurrent.ConcurrentMap; + import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutorService; + import org.apache.ignite.IgniteCheckedException; + import org.apache.ignite.cluster.ClusterNode; + import org.apache.ignite.events.DiscoveryEvent; + import org.apache.ignite.events.Event; + import org.apache.ignite.internal.GridKernalContext; + import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; + import org.apache.ignite.internal.MarshallerContextImpl; + import org.apache.ignite.internal.managers.communication.GridIoManager; + import org.apache.ignite.internal.managers.communication.GridMessageListener; + import org.apache.ignite.internal.managers.discovery.CustomEventListener; + import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; + import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; + import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; + import org.apache.ignite.internal.processors.GridProcessorAdapter; + import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; + import org.apache.ignite.internal.processors.closure.GridClosureProcessor; + import org.apache.ignite.internal.util.future.GridFutureAdapter; + import org.apache.ignite.internal.util.typedef.internal.U; + import org.apache.ignite.lang.IgniteFuture; + import org.apache.ignite.spi.discovery.DiscoveryDataBag; + import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData; + import org.jetbrains.annotations.Nullable; + import org.jsr166.ConcurrentHashMap8; + + import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; + import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; + import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.MARSHALLER_PROC; + import static org.apache.ignite.internal.GridTopic.TOPIC_MAPPING_MARSH; + import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; + + /** + * Processor responsible for managing custom {@link DiscoveryCustomMessage} + * events for exchanging marshalling mappings between nodes in grid. + * + * In particular it processes two flows: + * <ul> + * <li> + * Some node, server or client, wants to add new mapping for some class. + * In that case a pair of {@link MappingProposedMessage} and {@link MappingAcceptedMessage} events is used. + * </li> + * <li> + * As discovery events are delivered to clients asynchronously, + * client node may not have some mapping when server nodes in the grid are already allowed to use the mapping. + * In that situation client sends a {@link MissingMappingRequestMessage} request + * and processor handles it as well as {@link MissingMappingResponseMessage} message. + * </li> + * </ul> + */ + public class GridMarshallerMappingProcessor extends GridProcessorAdapter { + /** */ + private final MarshallerContextImpl marshallerCtx; + + /** */ + private final GridClosureProcessor closProc; + + /** */ + private final List<MappingUpdatedListener> mappingUpdatedLsnrs = new CopyOnWriteArrayList<>(); + + /** */ + private final ConcurrentMap<MarshallerMappingItem, GridFutureAdapter<MappingExchangeResult>> mappingExchangeSyncMap + = new ConcurrentHashMap8<>(); + + /** */ + private final ConcurrentMap<MarshallerMappingItem, ClientRequestFuture> clientReqSyncMap = new ConcurrentHashMap8<>(); + + /** + * @param ctx Kernal context. + */ + public GridMarshallerMappingProcessor(GridKernalContext ctx) { + super(ctx); + + marshallerCtx = ctx.marshallerContext(); + + closProc = ctx.closure(); + } + + /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { ++ @Override public void start(boolean activeOnStart) throws IgniteCheckedException { + GridDiscoveryManager discoMgr = ctx.discovery(); + GridIoManager ioMgr = ctx.io(); + + MarshallerMappingTransport transport = new MarshallerMappingTransport( + ctx, + mappingExchangeSyncMap, + clientReqSyncMap + ); ++ + marshallerCtx.onMarshallerProcessorStarted(ctx, transport); + + discoMgr.setCustomEventListener(MappingProposedMessage.class, new MarshallerMappingExchangeListener()); + + discoMgr.setCustomEventListener(MappingAcceptedMessage.class, new MappingAcceptedListener()); + + if (ctx.clientNode()) + ioMgr.addMessageListener(TOPIC_MAPPING_MARSH, new MissingMappingResponseListener()); + else + ioMgr.addMessageListener(TOPIC_MAPPING_MARSH, new MissingMappingRequestListener(ioMgr)); + + if (ctx.clientNode()) + ctx.event().addLocalEventListener(new GridLocalEventListener() { + @Override public void onEvent(Event evt) { + DiscoveryEvent evt0 = (DiscoveryEvent) evt; + + if (!ctx.isStopping()) { + for (ClientRequestFuture fut : clientReqSyncMap.values()) + fut.onNodeLeft(evt0.eventNode().id()); + } + } + }, EVT_NODE_LEFT, EVT_NODE_FAILED); + } + + /** + * Adds a listener to be notified when mapping changes. + * + * @param mappingUpdatedListener listener for mapping updated events. + */ + public void addMappingUpdatedListener(MappingUpdatedListener mappingUpdatedListener) { + mappingUpdatedLsnrs.add(mappingUpdatedListener); + } + + /** + * Gets an iterator over all current mappings. + * + * @return Iterator over current mappings. + */ + public Iterator<Map.Entry<Byte, Map<Integer, String>>> currentMappings() { + return marshallerCtx.currentMappings(); + } + + /** + * + */ + private final class MissingMappingRequestListener implements GridMessageListener { + /** */ + private final GridIoManager ioMgr; + + /** + * @param ioMgr Io manager. + */ + MissingMappingRequestListener(GridIoManager ioMgr) { + this.ioMgr = ioMgr; + } + + /** {@inheritDoc} */ + @Override public void onMessage(UUID nodeId, Object msg) { + assert msg instanceof MissingMappingRequestMessage : msg; + + MissingMappingRequestMessage msg0 = (MissingMappingRequestMessage) msg; + + byte platformId = msg0.platformId(); + int typeId = msg0.typeId(); + + String resolvedClsName = marshallerCtx.resolveMissedMapping(platformId, typeId); + + try { + ioMgr.sendToGridTopic( + nodeId, + TOPIC_MAPPING_MARSH, + new MissingMappingResponseMessage(platformId, typeId, resolvedClsName), + SYSTEM_POOL); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send missing mapping response.", e); + } + } + } + + /** + * + */ + private final class MissingMappingResponseListener implements GridMessageListener { + /** {@inheritDoc} */ + @Override public void onMessage(UUID nodeId, Object msg) { + assert msg instanceof MissingMappingResponseMessage : msg; + + MissingMappingResponseMessage msg0 = (MissingMappingResponseMessage) msg; + + byte platformId = msg0.platformId(); + int typeId = msg0.typeId(); + String resolvedClsName = msg0.className(); + + MarshallerMappingItem item = new MarshallerMappingItem(platformId, typeId, null); + + GridFutureAdapter<MappingExchangeResult> fut = clientReqSyncMap.get(item); + + if (fut != null) { + if (resolvedClsName != null) { + marshallerCtx.onMissedMappingResolved(item, resolvedClsName); + + fut.onDone(MappingExchangeResult.createSuccessfulResult(resolvedClsName)); + } + else + fut.onDone(MappingExchangeResult.createFailureResult( + new IgniteCheckedException( + "Failed to resolve mapping [platformId: " + + platformId + + ", typeId: " + + typeId + "]"))); + } + } + } + + /** + * + */ + private final class MarshallerMappingExchangeListener implements CustomEventListener<MappingProposedMessage> { + /** {@inheritDoc} */ + @Override public void onCustomEvent( + AffinityTopologyVersion topVer, + ClusterNode snd, + MappingProposedMessage msg + ) { + if (!ctx.isStopping()) { + if (msg.duplicated()) + return; + + if (!msg.inConflict()) { + MarshallerMappingItem item = msg.mappingItem(); + String conflictingName = marshallerCtx.onMappingProposed(item); + + if (conflictingName != null) { + if (conflictingName.equals(item.className())) + msg.markDuplicated(); + else + msg.conflictingWithClass(conflictingName); + } + } + else { + UUID origNodeId = msg.origNodeId(); + + if (origNodeId.equals(ctx.localNodeId())) { + GridFutureAdapter<MappingExchangeResult> fut = mappingExchangeSyncMap.get(msg.mappingItem()); + + assert fut != null: msg; + + fut.onDone(MappingExchangeResult.createFailureResult( + duplicateMappingException(msg.mappingItem(), msg.conflictingClassName()))); + } + } + } + } + + /** + * @param mappingItem Mapping item. + * @param conflictingClsName Conflicting class name. + */ + private IgniteCheckedException duplicateMappingException( + MarshallerMappingItem mappingItem, + String conflictingClsName + ) { + return new IgniteCheckedException("Duplicate ID [platformId=" + + mappingItem.platformId() + + ", typeId=" + + mappingItem.typeId() + + ", oldCls=" + + conflictingClsName + + ", newCls=" + + mappingItem.className() + "]"); + } + } + + /** + * + */ + private final class MappingAcceptedListener implements CustomEventListener<MappingAcceptedMessage> { + /** {@inheritDoc} */ + @Override public void onCustomEvent( + AffinityTopologyVersion topVer, + ClusterNode snd, + MappingAcceptedMessage msg + ) { + final MarshallerMappingItem item = msg.getMappingItem(); + marshallerCtx.onMappingAccepted(item); + + closProc.runLocalSafe(new Runnable() { + @Override public void run() { + for (MappingUpdatedListener lsnr : mappingUpdatedLsnrs) + lsnr.mappingUpdated(item.platformId(), item.typeId(), item.className()); + } + }); + + GridFutureAdapter<MappingExchangeResult> fut = mappingExchangeSyncMap.get(item); + + if (fut != null) + fut.onDone(MappingExchangeResult.createSuccessfulResult(item.className())); + } + } + + /** {@inheritDoc} */ + @Override public void collectGridNodeData(DiscoveryDataBag dataBag) { + if (!dataBag.commonDataCollectedFor(MARSHALLER_PROC.ordinal())) + dataBag.addGridCommonData(MARSHALLER_PROC.ordinal(), marshallerCtx.getCachedMappings()); + } + + /** {@inheritDoc} */ + @Override public void onGridDataReceived(GridDiscoveryData data) { + List<Map<Integer, MappedName>> mappings = (List<Map<Integer, MappedName>>) data.commonData(); + + if (mappings != null) { + for (int i = 0; i < mappings.size(); i++) { + Map<Integer, MappedName> map; + + if ((map = mappings.get(i)) != null) + marshallerCtx.onMappingDataReceived((byte) i, map); + } + } + } + + /** {@inheritDoc} */ + @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException { + cancelFutures(MappingExchangeResult.createFailureResult(new IgniteClientDisconnectedCheckedException( + ctx.cluster().clientReconnectFuture(), + "Failed to propose or request mapping, client node disconnected."))); + } + + /** {@inheritDoc} */ + @Override public void onKernalStop(boolean cancel) { + marshallerCtx.onMarshallerProcessorStop(); + + cancelFutures(MappingExchangeResult.createExchangeDisabledResult()); + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() { + return MARSHALLER_PROC; + } + + /** + * @param res Response. + */ + private void cancelFutures(MappingExchangeResult res) { + for (GridFutureAdapter<MappingExchangeResult> fut : mappingExchangeSyncMap.values()) + fut.onDone(res); + + for (GridFutureAdapter<MappingExchangeResult> fut : clientReqSyncMap.values()) + fut.onDone(res); + } + }
