ignite-1.5 Fix for transaction retry logic in DataStructuresProcessor. Fixed CacheObjectBinaryProcessorImpl.meta for client nodes to try get meta from cache if local value not found.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6e0ef348 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6e0ef348 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6e0ef348 Branch: refs/heads/ignite-1537 Commit: 6e0ef3480ed7d2df33aea6c23dff69c0cd957306 Parents: f6555da Author: sboikov <[email protected]> Authored: Fri Dec 11 14:55:27 2015 +0300 Committer: sboikov <[email protected]> Committed: Fri Dec 11 14:55:27 2015 +0300 ---------------------------------------------------------------------- .../binary/CacheObjectBinaryProcessorImpl.java | 12 +- .../datastructures/DataStructuresProcessor.java | 161 +++++++++---------- 2 files changed, 89 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6e0ef348/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java index 9ae8a62..12e7078 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java @@ -499,8 +499,16 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm /** {@inheritDoc} */ @Nullable @Override public BinaryType metadata(final int typeId) throws BinaryObjectException { try { - if (clientNode) - return clientMetaDataCache.get(typeId); + if (clientNode) { + BinaryType typeMeta = clientMetaDataCache.get(typeId); + + if (typeMeta != null) + return typeMeta; + + BinaryMetadata meta = metaDataCache.getTopologySafe(new PortableMetadataKey(typeId)); + + return meta != null ? meta.wrap(portableCtx) : null; + } else { PortableMetadataKey key = new PortableMetadataKey(typeId); http://git-wip-us.apache.org/repos/asf/ignite/blob/6e0ef348/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index 51c4067..cd783e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -57,15 +57,16 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.cache.CacheType; +import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheInternal; import org.apache.ignite.internal.processors.cache.GridCacheUtils; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; -import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.util.lang.IgniteClosureX; import org.apache.ignite.internal.util.lang.IgniteInClosureX; import org.apache.ignite.internal.util.lang.IgniteOutClosureX; @@ -504,8 +505,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException If failed. */ @Nullable private <T> T getAtomic(final IgniteOutClosureX<T> c, - DataStructureInfo dsInfo, - boolean create, + final DataStructureInfo dsInfo, + final boolean create, Class<? extends T> cls) throws IgniteCheckedException { @@ -527,39 +528,26 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { if (dataStructure != null) return dataStructure; - while (true) { - try { + return retryTopologySafe(new IgniteOutClosureX<T>() { + @Override public T applyx() throws IgniteCheckedException { if (!create) return c.applyx(); try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { - err = utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get(); + IgniteCheckedException err = + utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get(); if (err != null) throw err; - dataStructure = c.applyx(); + T dataStructure = c.applyx(); tx.commit(); return dataStructure; } } - catch (IgniteTxRollbackCheckedException ignore) { - // Safe to retry right away. - } - catch (IgniteCheckedException e) { - ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class); - - if (topErr == null) - throw e; - - IgniteInternalFuture<?> fut = topErr.retryReadyFuture(); - - if (fut != null) - fut.get(); - } - } + }); } /** @@ -597,10 +585,10 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { * @param afterRmv Optional closure to run after data structure removed. * @throws IgniteCheckedException If failed. */ - private <T> void removeDataStructure(IgniteOutClosureX<T> c, + private <T> void removeDataStructure(final IgniteOutClosureX<T> c, String name, DataStructureType type, - @Nullable IgniteInClosureX<T> afterRmv) + @Nullable final IgniteInClosureX<T> afterRmv) throws IgniteCheckedException { Map<String, DataStructureInfo> dsMap = utilityCache.get(DATA_STRUCTURES_KEY); @@ -608,52 +596,42 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { if (dsMap == null || !dsMap.containsKey(name)) return; - DataStructureInfo dsInfo = new DataStructureInfo(name, type, null); + final DataStructureInfo dsInfo = new DataStructureInfo(name, type, null); IgniteCheckedException err = validateDataStructure(dsMap, dsInfo, false); if (err != null) throw err; - while (true) { - try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { - T2<Boolean, IgniteCheckedException> res = - utilityCache.invoke(DATA_STRUCTURES_KEY, new RemoveDataStructureProcessor(dsInfo)).get(); - - err = res.get2(); + retryTopologySafe(new IgniteOutClosureX<Void>() { + @Override public Void applyx() throws IgniteCheckedException { + try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + T2<Boolean, IgniteCheckedException> res = + utilityCache.invoke(DATA_STRUCTURES_KEY, new RemoveDataStructureProcessor(dsInfo)).get(); - if (err != null) - throw err; + IgniteCheckedException err = res.get2(); - assert res.get1() != null; + if (err != null) + throw err; - boolean exists = res.get1(); + assert res.get1() != null; - if (!exists) - return; + boolean exists = res.get1(); - T rmvInfo = c.applyx(); + if (!exists) + return null; - tx.commit(); + T rmvInfo = c.applyx(); - if (afterRmv != null && rmvInfo != null) - afterRmv.applyx(rmvInfo); - } - catch (IgniteTxRollbackCheckedException ignore) { - // Safe to retry right away. - } - catch (IgniteCheckedException e) { - ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class); - - if (topErr == null) - throw e; + tx.commit(); - IgniteInternalFuture<?> fut = topErr.retryReadyFuture(); + if (afterRmv != null && rmvInfo != null) + afterRmv.applyx(rmvInfo); - if (fut != null) - fut.get(); + return null; + } } - } + }); } /** @@ -1000,7 +978,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException If failed. */ @Nullable private <T> T getCollection(final IgniteClosureX<GridCacheContext, T> c, - DataStructureInfo dsInfo, + final DataStructureInfo dsInfo, boolean create) throws IgniteCheckedException { @@ -1028,41 +1006,29 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { return c.applyx(cacheCtx); } - while (true) { - try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { - T2<String, IgniteCheckedException> res = - utilityCache.invoke(DATA_STRUCTURES_KEY, new AddCollectionProcessor(dsInfo)).get(); - - err = res.get2(); - - if (err != null) - throw err; - - String cacheName = res.get1(); + return retryTopologySafe(new IgniteOutClosureX<T>() { + @Override public T applyx() throws IgniteCheckedException { + try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + T2<String, IgniteCheckedException> res = + utilityCache.invoke(DATA_STRUCTURES_KEY, new AddCollectionProcessor(dsInfo)).get(); - final GridCacheContext cacheCtx = ctx.cache().internalCache(cacheName).context(); + IgniteCheckedException err = res.get2(); - T col = c.applyx(cacheCtx); + if (err != null) + throw err; - tx.commit(); + String cacheName = res.get1(); - return col; - } - catch (IgniteTxRollbackCheckedException ignore) { - // Safe to retry right away. - } - catch (IgniteCheckedException e) { - ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class); + final GridCacheContext cacheCtx = ctx.cache().internalCache(cacheName).context(); - if (topErr == null) - throw e; + T col = c.applyx(cacheCtx); - IgniteInternalFuture<?> fut = topErr.retryReadyFuture(); + tx.commit(); - if (fut != null) - fut.get(); + return col; + } } - } + }); } /** @@ -1659,6 +1625,37 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { } /** + * @param c Closure to run. + * @throws IgniteCheckedException If failed. + * @return Closure return value. + */ + private static <T> T retryTopologySafe(IgniteOutClosureX<T> c) throws IgniteCheckedException { + for (int i = 0; i < GridCacheAdapter.MAX_RETRIES; i++) { + try { + return c.applyx(); + } + catch (IgniteCheckedException e) { + if (i == GridCacheAdapter.MAX_RETRIES - 1) + throw e; + + ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class); + + if (topErr == null || (topErr instanceof ClusterTopologyServerNotFoundException)) + throw e; + + IgniteInternalFuture<?> fut = topErr.retryReadyFuture(); + + if (fut != null) + fut.get(); + } + } + + assert false; + + return null; + } + + /** * */ enum DataStructureType {
