IGNITE-1678 rework reservation, add reservation context

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0415064f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0415064f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0415064f

Branch: refs/heads/ignite-1678
Commit: 0415064f4dbbfb3bea6d359467af1525af235b68
Parents: cf9a98f
Author: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com>
Authored: Tue Aug 28 11:36:57 2018 +0300
Committer: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com>
Committed: Tue Aug 28 11:36:57 2018 +0300

----------------------------------------------------------------------
 .../GridCacheAtomicSequenceImpl.java            | 302 ++++++++++---------
 1 file changed, 154 insertions(+), 148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0415064f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
index ba75bfa..70d51ee 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
@@ -74,27 +74,12 @@ public final class GridCacheAtomicSequenceImpl extends 
AtomicDataStructureProxy<
     /** Reservation percentage. */
     private volatile int reservePercentage;
 
-    /** Reserved bottom bound of local counter (included). */
-    private volatile long reservedBottomBound;
-
-    /** Reserved upper bound of local counter (not included). */
-    private volatile long reservedUpBound;
-
-    /** A limit after which a new reservation should be done. */
-    private volatile long newReservationLine;
-
-    /** Reservation future. */
-    private volatile IgniteInternalFuture<?> reservationFut;
-
-    /** Reservation pool. */
-    private final byte poolPlc = GridIoPolicy.SYSTEM_POOL;
+    /** */
+    private ReservationBoundsContext reservationCtx;
 
     /** Synchronization lock for local value updates. */
     private final Lock localUpdateLock = new ReentrantLock();
 
-    /** */
-    private final Callable<Void> reserveCallableWithZeroOffset = 
reserveCallable(0);
-
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -131,10 +116,7 @@ public final class GridCacheAtomicSequenceImpl extends 
AtomicDataStructureProxy<
         this.upBound = upBound;
         this.locVal = locVal;
 
-        reservedBottomBound = locVal;
-        reservedUpBound = upBound;
-        // Calculate next reservation bound.
-        newReservationLine = calculateNewReservationLine(locVal);
+        reservationCtx = new ReservationBoundsContext(new 
ReservationBoundsResult(locVal, upBound));
     }
 
     /** {@inheritDoc} */
@@ -200,80 +182,30 @@ public final class GridCacheAtomicSequenceImpl extends 
AtomicDataStructureProxy<
     private long internalUpdate(long l, boolean updated) throws 
IgniteCheckedException {
         assert l > 0;
 
-        while (true){
+        while (true) {
             checkRemoved();
 
             localUpdateLock.lock();
 
             try {
-                IgniteInternalFuture<?> reservation = reservationFut;
-
-                boolean reservationInProgress = reservation != null;
-
-                long newLocalVal = locVal + l;
-
-                // Reserve new interval if operation is not in progress.
-                if (newLocalVal >= newReservationLine && newLocalVal <= 
reservedUpBound && !reservationInProgress) {
-                    reservationFut = reservation = runAsyncReservation(0);
-
-                    reservationInProgress = true;
-                }
-
                 long locVal0 = locVal;
 
+                long newLocalVal = 
reservationCtx.calculateNewLocalValue(locVal0, l);
+
                 if (newLocalVal <= upBound) {
                     locVal = newLocalVal;
 
                     return updated ? newLocalVal : locVal0;
                 }
-
-                // Await complete previous reservation.
-                if (reservationInProgress){
-                    reservation.get();
-
-                    reservationFut = null;
-
-                    // Retry check bounds.
-                    continue;
-                }
-
-                // Still in reserved interval.
-                if (newLocalVal < reservedUpBound) {
-                    long curVal = locVal;
-
-                    if (newLocalVal < reservedBottomBound)
-                        locVal = reservedBottomBound;
-                    else
-                        locVal += l;
-
-                    upBound = reservedUpBound;
-
-                    return updated ? locVal : curVal;
-                }
-                // Switched to the next interval. New value more that upper 
reserved bound.
-                else  {
-                    assert !reservationInProgress;
-
-                    long diff = newLocalVal - reservedUpBound;
-
-                    // Calculate how many batch size included in l.
-                    // It will our offset for global seq counter.
-                    long off = (diff / batchSize) * batchSize;
-
-                    reservationFut = reservation = runAsyncReservation(off);
-
-                    // Can not wait async, should wait under lock until new 
interval reserved.
-                    reservation.get();
-
-                    reservationFut = null;
-                }
             }
             finally {
                 localUpdateLock.unlock();
             }
+
+            // Await not under the lock.
+            reservationCtx.awaitCompleteReservation();
         }
     }
-
     /** {@inheritDoc} */
     @Override public int batchSize() {
         return batchSize;
@@ -331,77 +263,6 @@ public final class GridCacheAtomicSequenceImpl extends 
AtomicDataStructureProxy<
         }
     }
 
-    /**
-     * Runs async reservation of new range for current node.
-     *
-     * @param off Offset.
-     * @return Future.
-     */
-    private IgniteInternalFuture<?> runAsyncReservation(final long off) {
-        assert off >= 0 : off;
-
-        GridFutureAdapter<?> resFut = new GridFutureAdapter<>();
-
-        resFut.listen(f -> {
-            if (f.error() == null)
-                reservationFut = null; // Reset null only if there was not an 
error.
-        });
-
-        ctx.kernalContext().closure().runLocalSafe(() -> {
-            Callable<Void> reserveCall = off == 0 ? 
reserveCallableWithZeroOffset: reserveCallable(off);
-
-            try {
-                CU.retryTopologySafe(reserveCall);
-
-                resFut.onDone();
-            }
-            catch (Throwable h) {
-                resFut.onDone(h);
-            }
-        }, poolPlc);
-
-        return resFut;
-    }
-
-    /**
-     * @param off Reservation offset.
-     * @return Callable for reserved new interval.
-     */
-    private Callable<Void> reserveCallable(long off){
-       return new Callable<Void>() {
-            @Override public Void call() throws Exception {
-                try (GridNearTxLocal tx = CU.txStartInternal(ctx, cacheView, 
PESSIMISTIC, REPEATABLE_READ)) {
-                    GridCacheAtomicSequenceValue seq = cacheView.get(key);
-
-                    checkRemoved();
-
-                    assert seq != null;
-
-                    long curGlobalVal = seq.get();
-
-                    reservedBottomBound = curGlobalVal + off;
-
-                    reservedUpBound = reservedBottomBound + (batchSize > 1 ? 
batchSize - 1 : 1);
-
-                    newReservationLine = 
calculateNewReservationLine(reservedBottomBound);
-
-                    seq.set(reservedUpBound + 1);
-
-                    cacheView.put(key, seq);
-
-                    tx.commit();
-                }
-                catch (Error | Exception e) {
-                    if(!X.hasCause(e, ClusterTopologyCheckedException.class))
-                        U.error(log, "Failed to get and add: " + this, e);
-
-                    throw e;
-                }
-
-                return null;
-            }
-        };
-    }
 
     /**
      * @return New reservation line.
@@ -448,4 +309,149 @@ public final class GridCacheAtomicSequenceImpl extends 
AtomicDataStructureProxy<
     @Override public String toString() {
         return S.toString(GridCacheAtomicSequenceImpl.class, this);
     }
+
+    /**
+     *
+     */
+    private class ReservationBoundsResult {
+        /** Reserved bottom bound of local counter (included). */
+        private final long reservedBottomBound;
+
+        /** Reserved upper bound of local counter (not included). */
+        private final long reservedUpBound;
+
+        /** A limit after which a new reservation should be done. */
+        private final long newReservationLine;
+
+        private ReservationBoundsResult(long reservedBottomBound, long 
reservedUpBound) {
+            this.reservedBottomBound = reservedBottomBound;
+            this.reservedUpBound = reservedUpBound;
+            this.newReservationLine = 
calculateNewReservationLine(reservedBottomBound);
+        }
+    }
+
+    private class ReservationBoundsContext {
+
+        /** Reservation pool. */
+        private final byte poolPlc = GridIoPolicy.SYSTEM_POOL;
+
+        private ReservationBoundsResult bounds;
+
+        /** Reservation future. */
+        private IgniteInternalFuture<ReservationBoundsResult> reservationFut;
+
+        /** */
+        private final Callable<ReservationBoundsResult> 
reserveCallableWithZeroOffset = reserveCallable(0);
+
+        private ReservationBoundsContext(ReservationBoundsResult bounds) {
+            this.bounds = bounds;
+        }
+
+        void awaitCompleteReservation() throws IgniteCheckedException {
+          /*  IgniteInternalFuture<ReservationBoundsResult> fut = 
reservationFut;
+
+            if (fut != null)
+                fut.get();*/
+        }
+
+        long calculateNewLocalValue(long current, long delta) throws 
IgniteCheckedException {
+            long newVal = current + delta;
+
+            long offset = 0;
+
+            if (reservationFut == null) {
+                // Reserve new interval if operation is not in progress.
+                if (newVal >= bounds.newReservationLine && newVal <= 
bounds.reservedUpBound)
+                    reservationFut = runAsyncReservation(offset);
+                else if (newVal > bounds.reservedUpBound) {
+                    offset = newVal - bounds.reservedUpBound - 1;
+
+                    reservationFut = runAsyncReservation(offset);
+                }
+            }
+
+            if (newVal > upBound) {
+                bounds = reservationFut.get();
+
+                if (upBound < bounds.reservedBottomBound) {
+                    long diff = delta - (upBound - current) - 1;
+
+                    newVal = bounds.reservedBottomBound + diff - offset;
+                }
+
+                upBound = bounds.reservedUpBound;
+
+                reservationFut = null;
+            }
+
+            return newVal;
+        }
+
+        /**
+         * Runs async reservation of new range for current node.
+         *
+         * @param off Offset.
+         * @return Future.
+         */
+        private IgniteInternalFuture<ReservationBoundsResult> 
runAsyncReservation(final long off) {
+            assert off >= 0 : off;
+
+            GridFutureAdapter<ReservationBoundsResult> resFut = new 
GridFutureAdapter<>();
+
+            ctx.kernalContext().closure().runLocalSafe(() -> {
+                Callable<ReservationBoundsResult> reserveCall = off == 0
+                    ? reserveCallableWithZeroOffset: reserveCallable(off);
+
+                try {
+                    resFut.onDone(CU.retryTopologySafe(reserveCall));
+                }
+                catch (Throwable h) {
+                    resFut.onDone(h);
+                }
+            }, poolPlc);
+
+            return resFut;
+        }
+
+        /**
+         * @param off Reservation offset.
+         * @return Callable for reserved new interval.
+         */
+        private Callable<ReservationBoundsResult> reserveCallable(long off){
+            return new Callable<ReservationBoundsResult>() {
+                @Override public ReservationBoundsResult call() throws 
Exception {
+                    long bottomBound;
+                    long upBound;
+
+                    try (GridNearTxLocal tx = CU.txStartInternal(ctx, 
cacheView, PESSIMISTIC, REPEATABLE_READ)) {
+                        GridCacheAtomicSequenceValue seq = cacheView.get(key);
+
+                        checkRemoved();
+
+                        assert seq != null;
+
+                        long curGlobalVal = seq.get();
+
+                        bottomBound = curGlobalVal + off;
+
+                        upBound = bottomBound + (batchSize > 1 ? batchSize - 1 
: 1);
+
+                        seq.set(upBound + 1);
+
+                        cacheView.put(key, seq);
+
+                        tx.commit();
+                    }
+                    catch (Error | Exception e) {
+                        if (!X.hasCause(e, 
ClusterTopologyCheckedException.class))
+                            U.error(log, "Failed to get and add: " + this, e);
+
+                        throw e;
+                    }
+
+                    return new ReservationBoundsResult(bottomBound, upBound);
+                }
+            };
+        }
+    }
 }

Reply via email to