IGNITE-1299: Implemented IGFS file unlock with retries.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9fe3e8fd Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9fe3e8fd Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9fe3e8fd Branch: refs/heads/ignite-1093 Commit: 9fe3e8fd884f2f19bfe3fb39c9cda89c7ae495d8 Parents: e4ba2eb Author: iveselovskiy <[email protected]> Authored: Thu Aug 27 12:11:13 2015 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Thu Aug 27 12:11:13 2015 +0300 ---------------------------------------------------------------------- .../ignite/internal/IgniteInternalFuture.java | 10 ++++ .../query/GridCacheQueryFutureAdapter.java | 7 +++ .../processors/igfs/IgfsMetaManager.java | 43 +++++++-------- .../internal/processors/igfs/IgfsUtils.java | 52 ++++++++++++++++++ .../util/future/GridFinishedFuture.java | 5 ++ .../internal/util/future/GridFutureAdapter.java | 58 +++++++++++++------- 6 files changed, 131 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9fe3e8fd/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java index 2b7b821..74cfb06 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java @@ -69,6 +69,16 @@ public interface IgniteInternalFuture<R> { public R get(long timeout, TimeUnit unit) throws IgniteCheckedException; /** + * Synchronously waits for completion of the computation and returns computation result ignoring interrupts. + * + * @return Computation result. + * @throws IgniteFutureCancelledCheckedException Subclass of {@link IgniteCheckedException} throws if computation + * was cancelled. + * @throws IgniteCheckedException If computation failed. + */ + public R getUninterruptibly() throws IgniteCheckedException; + + /** * Cancels this future. * * @return {@code True} if future was canceled (i.e. was not finished prior to this call). http://git-wip-us.apache.org/repos/asf/ignite/blob/9fe3e8fd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java index 53017c9..ed5ad77 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java @@ -479,6 +479,13 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda return super.get(timeout, unit); } + /** {@inheritDoc} */ + @Override public Collection<R> getUninterruptibly() throws IgniteCheckedException { + if (!isDone()) + loadAllPages(); + + return super.getUninterruptibly(); + } /** * @param nodeId Sender node id. http://git-wip-us.apache.org/repos/asf/ignite/blob/9fe3e8fd/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java index b98c5d8..aabe503 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java @@ -478,49 +478,46 @@ public class IgfsMetaManager extends IgfsManager { * @param modificationTime Modification time to write to file info. * @throws IgniteCheckedException If failed. */ - public void unlock(IgfsFileInfo info, long modificationTime) throws IgniteCheckedException { + public void unlock(final IgfsFileInfo info, final long modificationTime) throws IgniteCheckedException { assert validTxState(false); assert info != null; if (busyLock.enterBusy()) { try { - IgniteUuid lockId = info.lockId(); + final IgniteUuid lockId = info.lockId(); if (lockId == null) return; // Temporary clear interrupted state for unlocking. - boolean interrupted = Thread.interrupted(); - - IgniteUuid fileId = info.id(); - - IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ); + final boolean interrupted = Thread.interrupted(); try { - // Lock file ID for this transaction. - IgfsFileInfo oldInfo = info(fileId); + IgfsUtils.doInTransactionWithRetries(metaCache, new IgniteOutClosureX<Void>() { + @Override public Void applyx() throws IgniteCheckedException { + IgniteUuid fileId = info.id(); - if (oldInfo == null) - throw fsException(new IgfsPathNotFoundException("Failed to unlock file (file not found): " + fileId)); + // Lock file ID for this transaction. + IgfsFileInfo oldInfo = info(fileId); - if (!info.lockId().equals(oldInfo.lockId())) - throw new IgniteCheckedException("Failed to unlock file (inconsistent file lock ID) [fileId=" + fileId + - ", lockId=" + info.lockId() + ", actualLockId=" + oldInfo.lockId() + ']'); + if (oldInfo == null) + throw fsException(new IgfsPathNotFoundException("Failed to unlock file (file not found): " + fileId)); - IgfsFileInfo newInfo = new IgfsFileInfo(oldInfo, null, modificationTime); + if (!info.lockId().equals(oldInfo.lockId())) + throw new IgniteCheckedException("Failed to unlock file (inconsistent file lock ID) [fileId=" + fileId + + ", lockId=" + info.lockId() + ", actualLockId=" + oldInfo.lockId() + ']'); - boolean put = metaCache.put(fileId, newInfo); + IgfsFileInfo newInfo = new IgfsFileInfo(oldInfo, null, modificationTime); - assert put : "Value was not stored in cache [fileId=" + fileId + ", newInfo=" + newInfo + ']'; + boolean put = metaCache.put(fileId, newInfo); - tx.commit(); - } - catch (GridClosureException e) { - throw U.cast(e); + assert put : "Value was not stored in cache [fileId=" + fileId + ", newInfo=" + newInfo + ']'; + + return null; + } + }); } finally { - tx.close(); - assert validTxState(false); if (interrupted) http://git-wip-us.apache.org/repos/asf/ignite/blob/9fe3e8fd/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java index 8026a44..7449f31 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java @@ -18,18 +18,31 @@ package org.apache.ignite.internal.processors.igfs; import org.apache.ignite.*; +import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.igfs.*; import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.transactions.*; import org.jetbrains.annotations.*; import java.lang.reflect.*; +import static org.apache.ignite.IgniteSystemProperties.*; +import static org.apache.ignite.transactions.TransactionConcurrency.*; +import static org.apache.ignite.transactions.TransactionIsolation.*; + /** * Common IGFS utility methods. */ public class IgfsUtils { + /** Maximum number of file unlock transaction retries when topology changes. */ + private static final int MAX_CACHE_TX_RETRIES = IgniteSystemProperties.getInteger(IGNITE_CACHE_RETRIES_COUNT, 100); + /** * Converts any passed exception to IGFS exception. * @@ -104,4 +117,43 @@ public class IgfsUtils { return user; } + + /** + * Performs an operation with transaction with retries. + * + * @param cache Cache to do the transaction on. + * @param clo Closure. + * @return Result of closure execution. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public static <T> T doInTransactionWithRetries(IgniteInternalCache cache, IgniteOutClosureX<T> clo) + throws IgniteCheckedException { + assert cache != null; + + int attempts = 0; + + while (attempts < MAX_CACHE_TX_RETRIES) { + try (Transaction tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { + T res = clo.applyx(); + + tx.commit(); + + return res; + } + catch (IgniteException | IgniteCheckedException e) { + ClusterTopologyException cte = X.cause(e, ClusterTopologyException.class); + + if (cte != null) + ((IgniteFutureImpl)cte.retryReadyFuture()).internalFuture().getUninterruptibly(); + else + throw U.cast(e); + } + + attempts++; + } + + throw new IgniteCheckedException("Failed to perform operation since max number of attempts " + + "exceeded. [maxAttempts=" + MAX_CACHE_TX_RETRIES + ']'); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9fe3e8fd/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java index 242e626..2adee90 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java @@ -126,6 +126,11 @@ public class GridFinishedFuture<T> implements IgniteInternalFuture<T> { } /** {@inheritDoc} */ + @Override public T getUninterruptibly() throws IgniteCheckedException { + return get(); + } + + /** {@inheritDoc} */ @Override public void listen(IgniteInClosure<? super IgniteInternalFuture<T>> lsnr) { assert lsnr != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/9fe3e8fd/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java index f8caf22..91ce549 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java @@ -107,6 +107,43 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements /** {@inheritDoc} */ @Override public R get() throws IgniteCheckedException { + return get0(ignoreInterrupts); + } + + /** {@inheritDoc} */ + @Override public R getUninterruptibly() throws IgniteCheckedException { + return get0(true); + } + + /** {@inheritDoc} */ + @Override public R get(long timeout) throws IgniteCheckedException { + // Do not replace with static import, as it may not compile. + return get(timeout, TimeUnit.MILLISECONDS); + } + + /** {@inheritDoc} */ + @Override public R get(long timeout, TimeUnit unit) throws IgniteCheckedException { + A.ensure(timeout >= 0, "timeout cannot be negative: " + timeout); + A.notNull(unit, "unit"); + + try { + return get0(unit.toNanos(timeout)); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IgniteInterruptedCheckedException("Got interrupted while waiting for future to complete.", e); + } + } + + /** + * Internal get routine. + * + * @param ignoreInterrupts Whether to ignore interrupts. + * @return Result. + * @throws IgniteCheckedException If failed. + */ + private R get0(boolean ignoreInterrupts) throws IgniteCheckedException { try { if (endTime == 0) { if (ignoreInterrupts) @@ -132,27 +169,6 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements } } - /** {@inheritDoc} */ - @Override public R get(long timeout) throws IgniteCheckedException { - // Do not replace with static import, as it may not compile. - return get(timeout, TimeUnit.MILLISECONDS); - } - - /** {@inheritDoc} */ - @Override public R get(long timeout, TimeUnit unit) throws IgniteCheckedException { - A.ensure(timeout >= 0, "timeout cannot be negative: " + timeout); - A.notNull(unit, "unit"); - - try { - return get0(unit.toNanos(timeout)); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IgniteInterruptedCheckedException("Got interrupted while waiting for future to complete.", e); - } - } - /** * @param nanosTimeout Timeout (nanoseconds). * @return Result.
