This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new db713f3  [DLOG] use Atomic***FieldUpdater and LongAdder if possible
db713f3 is described below

commit db713f32c24cc9345d8b813aeef9d079b254d16a
Author: Sijie Guo <[email protected]>
AuthorDate: Wed Mar 28 22:17:00 2018 -0700

    [DLOG] use Atomic***FieldUpdater and LongAdder if possible
    
    Descriptions of the changes in this PR:
    
    AtomicFieldUpdater + volatile provides similar guarantees as Atomic but use 
much fewer memory.
    In dlog, when handling large number of streams, there can be a lot of 
Atomic fields with `LogHandler`, `SegmentWriter` and such.
    Switching using Atomic to using AtomicFieldUpdater + volatile will save a 
lot of memory. See 
[details](http://normanmaurer.me/blog/2013/10/28/Lesser-known-concurrent-classes-Part-1/)
    
    LongAdder is less contended across threads, which is more preferable to 
AtomicLong when multiple threads update a common sum. E.g. 
SampledMovingAverageRate.
    Switching to use LongAdder if AtomicLong is not needed. See 
[details](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/LongAdder.html)
    
    Master Issue: #<master-issue-number>
    
    Author: Sijie Guo <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>, Jia Zhai 
<[email protected]>, Philip Su <[email protected]>
    
    This closes #1299 from sijie/use_atomic_field_updater
---
 .../common/rate/SampledMovingAverageRate.java      |  10 +-
 .../apache/distributedlog/BKAsyncLogReader.java    |  44 +++---
 .../org/apache/distributedlog/BKLogHandler.java    |  13 +-
 .../apache/distributedlog/BKLogReadHandler.java    |   4 +-
 .../apache/distributedlog/BKLogSegmentWriter.java  | 151 ++++++++++++---------
 .../impl/logsegment/BKLogSegmentEntryReader.java   |  41 +++---
 .../subscription/ZKSubscriptionStateStore.java     |  18 ++-
 .../distributedlog/lock/ZKDistributedLock.java     |   9 +-
 .../apache/distributedlog/lock/ZKSessionLock.java  |  33 ++---
 .../distributedlog/util/SimplePermitLimiter.java   |  19 +--
 .../distributedlog/zk/LimitedPermitManager.java    |  29 ++--
 .../apache/distributedlog/zk/ZKWatcherManager.java |  14 +-
 .../distributedlog/lock/TestZKSessionLock.java     |   8 +-
 .../apache/distributedlog/fs/DLOutputStream.java   |  10 +-
 14 files changed, 226 insertions(+), 177 deletions(-)

diff --git 
a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/rate/SampledMovingAverageRate.java
 
b/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/rate/SampledMovingAverageRate.java
index 2c89d64..d8c72af 100644
--- 
a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/rate/SampledMovingAverageRate.java
+++ 
b/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/rate/SampledMovingAverageRate.java
@@ -20,7 +20,7 @@ package org.apache.distributedlog.common.rate;
 import com.google.common.base.Ticker;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
 import org.apache.commons.lang3.tuple.Pair;
 
 /**
@@ -30,7 +30,7 @@ class SampledMovingAverageRate implements MovingAverageRate {
 
     private static final long NANOS_PER_SEC = TimeUnit.SECONDS.toNanos(1);
 
-    private final AtomicLong total;
+    private final LongAdder total;
     private final Ticker ticker;
     private final double scaleFactor;
     private final LinkedBlockingDeque<Pair<Long, Long>> samples;
@@ -45,7 +45,7 @@ class SampledMovingAverageRate implements MovingAverageRate {
                              double scaleFactor,
                              Ticker ticker) {
         this.value = 0;
-        this.total = new AtomicLong(0);
+        this.total = new LongAdder();
         this.scaleFactor = scaleFactor;
         this.ticker = ticker;
         this.samples = new LinkedBlockingDeque<>(intervalSecs);
@@ -58,7 +58,7 @@ class SampledMovingAverageRate implements MovingAverageRate {
 
     @Override
     public void add(long amount) {
-        total.getAndAdd(amount);
+        total.add(amount);
     }
 
     @Override
@@ -71,7 +71,7 @@ class SampledMovingAverageRate implements MovingAverageRate {
     }
 
     private double doSample() {
-        long newSample = total.get();
+        long newSample = total.sum();
         long newTimestamp = ticker.read();
 
         double rate = 0;
diff --git 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
index 3373896..6adcbc4 100644
--- 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
+++ 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
@@ -28,8 +28,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.function.Function;
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
@@ -77,12 +77,16 @@ class BKAsyncLogReader implements AsyncLogReader, 
SafeRunnable, AsyncNotificatio
     private final String streamName;
     protected final BKDistributedLogManager bkDistributedLogManager;
     protected final BKLogReadHandler readHandler;
-    private final AtomicReference<Throwable> lastException = new 
AtomicReference<Throwable>();
+    private static final AtomicReferenceFieldUpdater<BKAsyncLogReader, 
Throwable> lastExceptionUpdater =
+        AtomicReferenceFieldUpdater.newUpdater(BKAsyncLogReader.class, 
Throwable.class, "lastException");
+    private volatile Throwable lastException = null;
     private final OrderedScheduler scheduler;
     private final ConcurrentLinkedQueue<PendingReadRequest> pendingRequests =
             new ConcurrentLinkedQueue<PendingReadRequest>();
     private final Object scheduleLock = new Object();
-    private final AtomicLong scheduleCount = new AtomicLong(0);
+    private static final AtomicLongFieldUpdater<BKAsyncLogReader> 
scheduleCountUpdater =
+        AtomicLongFieldUpdater.newUpdater(BKAsyncLogReader.class, 
"scheduleCount");
+    private volatile long scheduleCount = 0L;
     private final Stopwatch scheduleDelayStopwatch;
     private final Stopwatch readNextDelayStopwatch;
     private DLSN startDLSN;
@@ -340,7 +344,7 @@ class BKAsyncLogReader implements AsyncLogReader, 
SafeRunnable, AsyncNotificatio
     }
 
     private boolean checkClosedOrInError(String operation) {
-        if (null == lastException.get()) {
+        if (null == lastExceptionUpdater.get(this)) {
             try {
                 if (null != readHandler && null != getReadAheadReader()) {
                     getReadAheadReader().checkLastException();
@@ -360,9 +364,10 @@ class BKAsyncLogReader implements AsyncLogReader, 
SafeRunnable, AsyncNotificatio
             }
         }
 
-        if (null != lastException.get()) {
+        Throwable cause = lastExceptionUpdater.get(this);
+        if (null != cause) {
             LOG.trace("Cancelling pending reads");
-            cancelAllPendingReads(lastException.get());
+            cancelAllPendingReads(cause);
             return true;
         }
 
@@ -370,7 +375,7 @@ class BKAsyncLogReader implements AsyncLogReader, 
SafeRunnable, AsyncNotificatio
     }
 
     private void setLastException(IOException exc) {
-        lastException.compareAndSet(null, exc);
+        lastExceptionUpdater.compareAndSet(this, null, exc);
     }
 
     @Override
@@ -446,7 +451,7 @@ class BKAsyncLogReader implements AsyncLogReader, 
SafeRunnable, AsyncNotificatio
         }
 
         if (checkClosedOrInError("readNext")) {
-            readRequest.completeExceptionally(lastException.get());
+            readRequest.completeExceptionally(lastExceptionUpdater.get(this));
         } else {
             boolean queueEmpty = pendingRequests.isEmpty();
             pendingRequests.add(readRequest);
@@ -469,7 +474,7 @@ class BKAsyncLogReader implements AsyncLogReader, 
SafeRunnable, AsyncNotificatio
             return;
         }
 
-        long prevCount = scheduleCount.getAndIncrement();
+        long prevCount = scheduleCountUpdater.getAndIncrement(this);
         if (0 == prevCount) {
             scheduleDelayStopwatch.reset().start();
             scheduler.submitOrdered(streamName, this);
@@ -574,7 +579,7 @@ class BKAsyncLogReader implements AsyncLogReader, 
SafeRunnable, AsyncNotificatio
 
             Stopwatch runTime = Stopwatch.createStarted();
             int iterations = 0;
-            long scheduleCountLocal = scheduleCount.get();
+            long scheduleCountLocal = scheduleCountUpdater.get(this);
             LOG.debug("{}: Scheduled Background Reader", 
readHandler.getFullyQualifiedName());
             while (true) {
                 if (LOG.isTraceEnabled()) {
@@ -588,7 +593,7 @@ class BKAsyncLogReader implements AsyncLogReader, 
SafeRunnable, AsyncNotificatio
                     // Queue is empty, nothing to read, return
                     if (null == nextRequest) {
                         LOG.trace("{}: Queue Empty waiting for Input", 
readHandler.getFullyQualifiedName());
-                        scheduleCount.set(0);
+                        scheduleCountUpdater.set(this, 0);
                         backgroundReaderRunTime.registerSuccessfulEvent(
                             runTime.stop().elapsed(TimeUnit.MICROSECONDS), 
TimeUnit.MICROSECONDS);
                         return;
@@ -605,7 +610,7 @@ class BKAsyncLogReader implements AsyncLogReader, 
SafeRunnable, AsyncNotificatio
                 // If the oldest pending promise is interrupted then we must 
mark
                 // the reader in error and abort all pending reads since we 
dont
                 // know the last consumed read
-                if (null == lastException.get()) {
+                if (null == lastExceptionUpdater.get(this)) {
                     if (nextRequest.getPromise().isCancelled()) {
                         setLastException(new 
DLInterruptedException("Interrupted on reading "
                             + readHandler.getFullyQualifiedName()));
@@ -613,8 +618,9 @@ class BKAsyncLogReader implements AsyncLogReader, 
SafeRunnable, AsyncNotificatio
                 }
 
                 if (checkClosedOrInError("readNext")) {
-                    if (!(lastException.get().getCause() instanceof 
LogNotFoundException)) {
-                        LOG.warn("{}: Exception", 
readHandler.getFullyQualifiedName(), lastException.get());
+                    Throwable lastException = lastExceptionUpdater.get(this);
+                    if (lastException != null && !(lastException.getCause() 
instanceof LogNotFoundException)) {
+                        LOG.warn("{}: Exception", 
readHandler.getFullyQualifiedName(), lastException);
                     }
                     backgroundReaderRunTime.registerFailedEvent(
                         runTime.stop().elapsed(TimeUnit.MICROSECONDS), 
TimeUnit.MICROSECONDS);
@@ -659,7 +665,7 @@ class BKAsyncLogReader implements AsyncLogReader, 
SafeRunnable, AsyncNotificatio
                     setLastException(exc);
                     if (!(exc instanceof LogNotFoundException)) {
                         LOG.warn("{} : read with skip Exception",
-                                readHandler.getFullyQualifiedName(), 
lastException.get());
+                                readHandler.getFullyQualifiedName(), 
lastExceptionUpdater.get(this));
                     }
                     continue;
                 }
@@ -670,7 +676,7 @@ class BKAsyncLogReader implements AsyncLogReader, 
SafeRunnable, AsyncNotificatio
                         backgroundReaderRunTime.registerSuccessfulEvent(
                             runTime.stop().elapsed(TimeUnit.MICROSECONDS), 
TimeUnit.MICROSECONDS);
                         scheduleDelayStopwatch.reset().start();
-                        scheduleCount.set(0);
+                        scheduleCountUpdater.set(this, 0);
                         // the request could still wait for more records
                         backgroundScheduleTask = scheduler.scheduleOrdered(
                                 streamName,
@@ -702,12 +708,12 @@ class BKAsyncLogReader implements AsyncLogReader, 
SafeRunnable, AsyncNotificatio
                     }
                 } else {
                     if (0 == scheduleCountLocal) {
-                        LOG.trace("Schedule count dropping to zero", 
lastException.get());
+                        LOG.trace("Schedule count dropping to zero", 
lastExceptionUpdater.get(this));
                         backgroundReaderRunTime.registerSuccessfulEvent(
                             runTime.stop().elapsed(TimeUnit.MICROSECONDS), 
TimeUnit.MICROSECONDS);
                         return;
                     }
-                    scheduleCountLocal = scheduleCount.decrementAndGet();
+                    scheduleCountLocal = 
scheduleCountUpdater.decrementAndGet(this);
                 }
             }
         }
diff --git 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogHandler.java
 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogHandler.java
index afcd7bb..a319d44 100644
--- 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogHandler.java
+++ 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogHandler.java
@@ -31,7 +31,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.function.Function;
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
@@ -97,7 +97,9 @@ abstract class BKLogHandler implements AsyncCloseable, 
AsyncAbortable {
     protected final AlertStatsLogger alertStatsLogger;
     protected volatile boolean reportGetSegmentStats = false;
     private final String lockClientId;
-    protected final AtomicReference<IOException> metadataException = new 
AtomicReference<IOException>(null);
+    protected static final AtomicReferenceFieldUpdater<BKLogHandler, 
IOException> METADATA_EXCEPTION_UPDATER =
+        AtomicReferenceFieldUpdater.newUpdater(BKLogHandler.class, 
IOException.class, "metadataException");
+    private volatile IOException metadataException = null;
 
     // Maintain the list of log segments per stream
     protected final PerStreamLogSegmentCache logSegmentCache;
@@ -155,8 +157,9 @@ abstract class BKLogHandler implements AsyncCloseable, 
AsyncAbortable {
     }
 
     BKLogHandler checkMetadataException() throws IOException {
-        if (null != metadataException.get()) {
-            throw metadataException.get();
+        IOException ioe = METADATA_EXCEPTION_UPDATER.get(this);
+        if (null != ioe) {
+            throw ioe;
         }
         return this;
     }
@@ -480,7 +483,7 @@ abstract class BKLogHandler implements AsyncCloseable, 
AsyncAbortable {
             // the log segments cache went wrong
             LOG.error("Unexpected exception on getting log segments from the 
cache for stream {}",
                     getFullyQualifiedName(), ue);
-            metadataException.compareAndSet(null, ue);
+            METADATA_EXCEPTION_UPDATER.compareAndSet(this, null, ue);
             throw ue;
         }
     }
diff --git 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java
 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java
index 66ca2de..1175f39 100644
--- 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java
+++ 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java
@@ -272,7 +272,7 @@ class BKLogReadHandler extends BKLogHandler implements 
LogSegmentNamesListener {
                         || cause instanceof LogSegmentNotFoundException
                         || cause instanceof UnexpectedException) {
                     // indicate some inconsistent behavior, abort
-                    metadataException.compareAndSet(null, (IOException) cause);
+                    
METADATA_EXCEPTION_UPDATER.compareAndSet(BKLogReadHandler.this, null, 
(IOException) cause);
                     // notify the reader that read handler is in error state
                     notifyReaderOnError(cause);
                     FutureUtils.completeExceptionally(promise, cause);
@@ -318,7 +318,7 @@ class BKLogReadHandler extends BKLogHandler implements 
LogSegmentNamesListener {
                         || cause instanceof LogSegmentNotFoundException
                         || cause instanceof UnexpectedException) {
                     // indicate some inconsistent behavior, abort
-                    metadataException.compareAndSet(null, (IOException) cause);
+                    
METADATA_EXCEPTION_UPDATER.compareAndSet(BKLogReadHandler.this, null, 
(IOException) cause);
                     // notify the reader that read handler is in error state
                     notifyReaderOnError(cause);
                     return;
diff --git 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
index 6263ddf..4ad977b 100644
--- 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
+++ 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
@@ -34,8 +34,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Function;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
@@ -53,6 +53,7 @@ import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
+import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.distributedlog.Entry.Writer;
 import org.apache.distributedlog.common.stats.OpStatsListener;
 import org.apache.distributedlog.common.util.PermitLimiter;
@@ -158,13 +159,16 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
     private final int logSegmentMetadataVersion;
     private BKTransmitPacket packetPrevious;
     private Entry.Writer recordSetWriter;
-    private final AtomicInteger outstandingTransmits;
+    private static final AtomicIntegerFieldUpdater<BKLogSegmentWriter> 
outstandingTransmitsUpdater =
+        AtomicIntegerFieldUpdater.newUpdater(BKLogSegmentWriter.class, 
"outstandingTransmits");
+    private volatile int outstandingTransmits = 0;
     private final int transmissionThreshold;
     protected final LogSegmentEntryWriter entryWriter;
     private final CompressionCodec.Type compressionType;
     private final ReentrantLock transmitLock = new ReentrantLock();
-    private final AtomicInteger transmitResult =
-            new AtomicInteger(BKException.Code.OK);
+    private static final AtomicIntegerFieldUpdater<BKLogSegmentWriter> 
transmitResultUpdater =
+        AtomicIntegerFieldUpdater.newUpdater(BKLogSegmentWriter.class, 
"transmitResult");
+    private volatile int transmitResult = BKException.Code.OK;
     private final DistributedLock lock;
     private final boolean isDurableWriteEnabled;
     private DLSN lastDLSN = DLSN.InvalidDLSN;
@@ -188,11 +192,24 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
     private boolean streamEnded = false;
     private final ScheduledFuture<?> periodicFlushSchedule;
     private final ScheduledFuture<?> periodicKeepAliveSchedule;
-    private final AtomicReference<ScheduledFuture<?>> transmitSchedFutureRef =
-            new AtomicReference<ScheduledFuture<?>>(null);
-    private final AtomicReference<ScheduledFuture<?>> immFlushSchedFutureRef =
-            new AtomicReference<ScheduledFuture<?>>(null);
-    private final AtomicReference<Exception> scheduledFlushException = new 
AtomicReference<Exception>(null);
+    private static final AtomicReferenceFieldUpdater<BKLogSegmentWriter, 
ScheduledFuture>
+        transmitSchedFutureRefUpdater = AtomicReferenceFieldUpdater.newUpdater(
+            BKLogSegmentWriter.class,
+            ScheduledFuture.class,
+            "transmitSchedFutureRef");
+    private volatile ScheduledFuture transmitSchedFutureRef = null;
+    private static final AtomicReferenceFieldUpdater<BKLogSegmentWriter, 
ScheduledFuture>
+        immFlushSchedFutureRefUpdater = AtomicReferenceFieldUpdater.newUpdater(
+            BKLogSegmentWriter.class,
+            ScheduledFuture.class,
+            "immFlushSchedFutureRef");
+    private volatile ScheduledFuture immFlushSchedFutureRef = null;
+    private static final AtomicReferenceFieldUpdater<BKLogSegmentWriter, 
Exception> scheduledFlushExceptionUpdater =
+        AtomicReferenceFieldUpdater.newUpdater(
+            BKLogSegmentWriter.class,
+            Exception.class,
+            "scheduledFlushException");
+    private volatile Exception scheduledFlushException = null;
     private boolean enforceLock = true;
     private CompletableFuture<Void> closeFuture = null;
     private final boolean enableRecordCounts;
@@ -300,12 +317,11 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
             }
             @Override
             public Number getSample() {
-                return outstandingTransmits.get();
+                return 
outstandingTransmitsUpdater.get(BKLogSegmentWriter.this);
             }
         };
         transmitOutstandingLogger.registerGauge("requests", 
transmitOutstandingGauge);
 
-        outstandingTransmits = new AtomicInteger(0);
         this.fullyQualifiedLogSegment = streamName + ":" + logSegmentName;
         this.streamName = streamName;
         this.logSegmentMetadataVersion = logSegmentMetadataVersion;
@@ -395,7 +411,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
 
     @VisibleForTesting
     void setTransmitResult(int rc) {
-        transmitResult.set(rc);
+        transmitResultUpdater.set(this, rc);
     }
 
     @VisibleForTesting
@@ -510,7 +526,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
         if (null != packet) {
             EntryBuffer recordSet = packet.getRecordSet();
             numRecords = recordSet.getNumRecords();
-            int rc = transmitResult.get();
+            int rc = transmitResultUpdater.get(this);
             if (BKException.Code.OK == rc) {
                 rc = BKException.Code.InterruptedException;
             }
@@ -537,13 +553,13 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
             closePromise = closeFuture = new CompletableFuture<Void>();
         }
 
-        AtomicReference<Throwable> throwExc = new 
AtomicReference<Throwable>(null);
+        MutableObject<Throwable> throwExc = new MutableObject<>(null);
         closeInternal(abort, throwExc, closePromise);
         return closePromise;
     }
 
     private void closeInternal(final boolean abort,
-                               final AtomicReference<Throwable> throwExc,
+                               final MutableObject<Throwable> throwExc,
                                final CompletableFuture<Void> closePromise) {
         // clean stats resources
         this.transmitOutstandingLogger.unregisterGauge("requests", 
transmitOutstandingGauge);
@@ -579,7 +595,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
 
                 @Override
                 public void onFailure(Throwable cause) {
-                    throwExc.set(cause);
+                    throwExc.setValue(cause);
                     abortTransmitPacketOnClose(abort, throwExc, closePromise);
                 }
             });
@@ -590,12 +606,12 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
     }
 
     private void abortTransmitPacketOnClose(final boolean abort,
-                                            final AtomicReference<Throwable> 
throwExc,
+                                            final MutableObject<Throwable> 
throwExc,
                                             final CompletableFuture<Void> 
closePromise) {
         LOG.info("Closing BKPerStreamLogWriter (abort={}) for {} :"
                         + " lastDLSN = {} outstandingTransmits = {} 
writesPendingTransmit = {}",
                 new Object[]{abort, fullyQualifiedLogSegment, getLastDLSN(),
-                        outstandingTransmits.get(), 
getWritesPendingTransmit()});
+                        outstandingTransmitsUpdater.get(this), 
getWritesPendingTransmit()});
 
         // Save the current packet to reset, leave a new empty packet to avoid 
a race with
         // addCompleteDeferredProcessing.
@@ -629,10 +645,10 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
     }
 
     private void closeLedgerOnClose(final boolean abort,
-                                    final AtomicReference<Throwable> throwExc,
+                                    final MutableObject<Throwable> throwExc,
                                     final CompletableFuture<Void> 
closePromise) {
         // close the log segment if it isn't in error state, so all the 
outstanding addEntry(s) will callback.
-        if (null == throwExc.get() && !isLogSegmentInError()) {
+        if (null == throwExc.getValue() && !isLogSegmentInError()) {
             // Synchronous closing the ledger handle, if we couldn't close a 
ledger handle successfully.
             // we should throw the exception to #closeToFinalize, so it would 
fail completing a log segment.
             entryWriter.asyncClose(new CloseCallback() {
@@ -640,7 +656,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
                 public void closeComplete(int rc, LedgerHandle lh, Object ctx) 
{
                     if (BKException.Code.OK != rc && 
BKException.Code.LedgerClosedException != rc) {
                         if (!abort) {
-                            throwExc.set(new IOException("Failed to close 
ledger for "
+                            throwExc.setValue(new IOException("Failed to close 
ledger for "
                                     + fullyQualifiedLogSegment + " : " + 
BKException.getMessage(rc)));
                         }
                     }
@@ -653,17 +669,17 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
     }
 
     private void completeClosePromise(final boolean abort,
-                                      final AtomicReference<Throwable> 
throwExc,
+                                      final MutableObject<Throwable> throwExc,
                                       final CompletableFuture<Void> 
closePromise) {
         // If add entry failed because of closing ledger above, we don't need 
to fail the close operation
-        if (!abort && null == throwExc.get() && 
shouldFailCompleteLogSegment()) {
-            throwExc.set(new BKTransmitException("Closing an errored stream : 
", transmitResult.get()));
+        if (!abort && null == throwExc.getValue() && 
shouldFailCompleteLogSegment()) {
+            throwExc.setValue(new BKTransmitException("Closing an errored 
stream : ", transmitResultUpdater.get(this)));
         }
 
-        if (null == throwExc.get()) {
+        if (null == throwExc.getValue()) {
             FutureUtils.complete(closePromise, null);
         } else {
-            FutureUtils.completeExceptionally(closePromise, throwExc.get());
+            FutureUtils.completeExceptionally(closePromise, 
throwExc.getValue());
         }
     }
 
@@ -721,9 +737,9 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
                     
BKException.getMessage(BKException.Code.LedgerClosedException));
         }
 
-        if (BKException.Code.OK != transmitResult.get()) {
+        if (BKException.Code.OK != transmitResultUpdater.get(this)) {
             // Failfast if the stream already encountered error with safe 
retry on the client
-            throw new WriteException(fullyQualifiedLogSegment, 
BKException.getMessage(transmitResult.get()));
+            throw new WriteException(fullyQualifiedLogSegment, 
BKException.getMessage(transmitResultUpdater.get(this)));
         }
 
         if (streamEnded) {
@@ -777,12 +793,12 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
     }
 
     boolean isLogSegmentInError() {
-        return (transmitResult.get() != BKException.Code.OK);
+        return (transmitResultUpdater.get(this) != BKException.Code.OK);
     }
 
     boolean shouldFailCompleteLogSegment() {
-        return (transmitResult.get() != BKException.Code.OK)
-                && (transmitResult.get() != 
BKException.Code.LedgerClosedException);
+        return (transmitResultUpdater.get(this) != BKException.Code.OK)
+                && (transmitResultUpdater.get(this) != 
BKException.Code.LedgerClosedException);
     }
 
     public synchronized CompletableFuture<DLSN> writeInternal(LogRecord record)
@@ -977,23 +993,24 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
         }
     }
 
-    void scheduleFlushWithDelayIfNeeded(final Callable<?> callable,
-                                        final 
AtomicReference<ScheduledFuture<?>> scheduledFutureRef) {
+    void scheduleFlushWithDelayIfNeeded(
+            final Callable<?> callable,
+            final AtomicReferenceFieldUpdater<BKLogSegmentWriter, 
ScheduledFuture> scheduledFutureRefUpdater) {
         final long delayMs = Math.max(0, minDelayBetweenImmediateFlushMs - 
lastTransmit.elapsed(TimeUnit.MILLISECONDS));
-        final ScheduledFuture<?> scheduledFuture = scheduledFutureRef.get();
+        final ScheduledFuture scheduledFuture = 
scheduledFutureRefUpdater.get(this);
         if ((null == scheduledFuture) || scheduledFuture.isDone()) {
-            scheduledFutureRef.set(scheduler.schedule(new Runnable() {
+            scheduledFutureRefUpdater.set(this, scheduler.schedule(new 
Runnable() {
                 @Override
                 public void run() {
                     synchronized (this) {
-                        scheduledFutureRef.set(null);
+                        scheduledFutureRefUpdater.set(BKLogSegmentWriter.this, 
null);
                         try {
                             callable.call();
 
                             // Flush was successful or wasn't needed, the 
exception should be unset.
-                            scheduledFlushException.set(null);
+                            
scheduledFlushExceptionUpdater.set(BKLogSegmentWriter.this, null);
                         } catch (Exception exc) {
-                            scheduledFlushException.set(exc);
+                            
scheduledFlushExceptionUpdater.set(BKLogSegmentWriter.this, exc);
                             LOG.error("Delayed flush failed", exc);
                         }
                     }
@@ -1017,15 +1034,16 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
                         checkStateAndTransmit();
                         return null;
                     }
-                }, transmitSchedFutureRef);
+                }, transmitSchedFutureRefUpdater);
 
                 // Timing here is not very important--the last flush failed 
and we should
                 // indicate this to the caller. The next flush may succeed and 
unset the
                 // scheduledFlushException in which case the next write will 
succeed (if the caller
                 // hasn't already closed the writer).
-                if (scheduledFlushException.get() != null) {
+                Exception exec = scheduledFlushExceptionUpdater.get(this);
+                if (exec != null) {
                     throw new FlushException("Last flush encountered an error 
while writing data to the backend",
-                        getLastTxId(), getLastTxIdAcknowledged(), 
scheduledFlushException.get());
+                        getLastTxId(), getLastTxIdAcknowledged(), exec);
                 }
             }
         }
@@ -1069,14 +1087,14 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
                 checkWriteLock();
                 // If transmitResult is anything other than 
BKException.Code.OK, it means that the
                 // stream has encountered an error and cannot be written to.
-                if (!transmitResult.compareAndSet(BKException.Code.OK,
+                if (!transmitResultUpdater.compareAndSet(this, 
BKException.Code.OK,
                                                   BKException.Code.OK)) {
                     LOG.error("Log Segment {} Trying to write to an errored 
stream; Error is {}",
                               fullyQualifiedLogSegment,
-                              BKException.getMessage(transmitResult.get()));
+                              
BKException.getMessage(transmitResultUpdater.get(this)));
                     throw new BKTransmitException("Trying to write to an 
errored stream;"
-                                                          + " Error code : (" 
+ transmitResult.get() + ") "
-                            + BKException.getMessage(transmitResult.get()), 
transmitResult.get());
+                                                          + " Error code : (" 
+ transmitResultUpdater.get(this) + ") "
+                            + 
BKException.getMessage(transmitResultUpdater.get(this)), 
transmitResultUpdater.get(this));
                 }
 
                 if (recordSetWriter.getNumRecords() == 0) {
@@ -1107,7 +1125,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
                           new Object[] {fullyQualifiedLogSegment}, e);
                 // If a write fails here, we need to set the transmit result 
to an error so that
                 // no future writes go through and violate ordering guarantees.
-                transmitResult.set(BKException.Code.WriteException);
+                transmitResultUpdater.set(this, 
BKException.Code.WriteException);
                 if (e instanceof InvalidEnvelopedEntryException) {
                     alertStatsLogger.raise("Invalid enveloped entry for 
segment {} : ", fullyQualifiedLogSegment, e);
                     throw (InvalidEnvelopedEntryException) e;
@@ -1131,7 +1149,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
                 }
 
                 lastTransmit.reset().start();
-                outstandingTransmits.incrementAndGet();
+                outstandingTransmitsUpdater.incrementAndGet(this);
                 controlFlushNeeded = false;
                 return packet.getTransmitFuture();
             }
@@ -1145,7 +1163,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
      *  flush task can determine if there is anything it needs to do.
      */
     private synchronized  boolean haveDataToTransmit() {
-        if (!transmitResult.compareAndSet(BKException.Code.OK, 
BKException.Code.OK)) {
+        if (!transmitResultUpdater.compareAndSet(this, BKException.Code.OK, 
BKException.Code.OK)) {
             // Even if there is data it cannot be transmitted, so effectively 
nothing to send
             return false;
         }
@@ -1156,14 +1174,15 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
     @Override
     public void addComplete(final int rc, LedgerHandle handle,
                             final long entryId, final Object ctx) {
-        final AtomicReference<Integer> effectiveRC = new 
AtomicReference<Integer>(rc);
+        int rcAfterFailPoint = rc;
         try {
             if 
(FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_TransmitComplete))
 {
-                effectiveRC.set(BKException.Code.UnexpectedConditionException);
+                rcAfterFailPoint = 
BKException.Code.UnexpectedConditionException;
             }
         } catch (Exception exc) {
-            effectiveRC.set(BKException.Code.UnexpectedConditionException);
+            rcAfterFailPoint = BKException.Code.UnexpectedConditionException;
         }
+        final int effectiveRC = rcAfterFailPoint;
 
         // Sanity check to make sure we're receiving these callbacks in order.
         if (entryId > -1 && lastEntryId >= entryId) {
@@ -1199,7 +1218,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
                     addCompleteQueuedTime.registerSuccessfulEvent(
                         queuedTime.elapsed(TimeUnit.MICROSECONDS),
                         TimeUnit.MICROSECONDS);
-                    addCompleteDeferredProcessing(transmitPacket, entryId, 
effectiveRC.get());
+                    addCompleteDeferredProcessing(transmitPacket, entryId, 
effectiveRC);
                     addCompleteDeferredTime.registerSuccessfulEvent(
                         deferredTime.elapsed(TimeUnit.MICROSECONDS),
                         TimeUnit.MILLISECONDS);
@@ -1223,14 +1242,14 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
                 }
             });
             // Race condition if we notify before the addComplete is enqueued.
-            transmitPacket.notifyTransmitComplete(effectiveRC.get());
-            outstandingTransmits.getAndDecrement();
+            transmitPacket.notifyTransmitComplete(effectiveRC);
+            outstandingTransmitsUpdater.getAndDecrement(this);
         } else {
             // Notify transmit complete must be called before deferred 
processing in the
             // sync case since otherwise callbacks in deferred processing may 
deadlock.
-            transmitPacket.notifyTransmitComplete(effectiveRC.get());
-            outstandingTransmits.getAndDecrement();
-            addCompleteDeferredProcessing(transmitPacket, entryId, 
effectiveRC.get());
+            transmitPacket.notifyTransmitComplete(effectiveRC);
+            outstandingTransmitsUpdater.getAndDecrement(this);
+            addCompleteDeferredProcessing(transmitPacket, entryId, 
effectiveRC);
         }
     }
 
@@ -1240,17 +1259,17 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
         boolean cancelPendingPromises = false;
         EntryBuffer recordSet = transmitPacket.getRecordSet();
         synchronized (this) {
-            if (transmitResult.compareAndSet(BKException.Code.OK, rc)) {
+            if (transmitResultUpdater.compareAndSet(this, BKException.Code.OK, 
rc)) {
                 // If this is the first time we are setting an error code in 
the transmitResult then
                 // we must cancel pending promises; once this error has been 
set, more records will not
                 // be enqueued; they will be failed with WriteException
                 cancelPendingPromises = (BKException.Code.OK != rc);
             } else {
                 LOG.warn("Log segment {} entryId {}: Tried to set transmit 
result to ({}) but is already ({})",
-                    new Object[] {fullyQualifiedLogSegment, entryId, rc, 
transmitResult.get()});
+                    new Object[] {fullyQualifiedLogSegment, entryId, rc, 
transmitResultUpdater.get(this)});
             }
 
-            if (transmitResult.get() != BKException.Code.OK) {
+            if (transmitResultUpdater.get(this) != BKException.Code.OK) {
                 if (recordSet.hasUserRecords()) {
                     transmitDataPacketSize.registerFailedEvent(
                         recordSet.getNumBytes(), TimeUnit.MICROSECONDS);
@@ -1273,14 +1292,14 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
                                     backgroundFlush(true);
                                     return null;
                                 }
-                            }, immFlushSchedFutureRef);
+                            }, immFlushSchedFutureRefUpdater);
                         }
                     }
                 }
             }
 
             // update last dlsn before satisifying future
-            if (BKException.Code.OK == transmitResult.get()) {
+            if (BKException.Code.OK == transmitResultUpdater.get(this)) {
                 DLSN lastDLSNInPacket = recordSet.finalizeTransmit(
                         logSegmentSequenceNumber, entryId);
                 if (recordSet.hasUserRecords()) {
@@ -1291,10 +1310,10 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
             }
         }
 
-        if (BKException.Code.OK == transmitResult.get()) {
+        if (BKException.Code.OK == transmitResultUpdater.get(this)) {
             recordSet.completeTransmit(logSegmentSequenceNumber, entryId);
         } else {
-            
recordSet.abortTransmit(Utils.transmitException(transmitResult.get()));
+            
recordSet.abortTransmit(Utils.transmitException(transmitResultUpdater.get(this)));
         }
 
         if (cancelPendingPromises) {
@@ -1308,7 +1327,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
             }
             packetCurrentSaved.getRecordSet().abortTransmit(
                     new WriteCancelledException(streamName,
-                            Utils.transmitException(transmitResult.get())));
+                            
Utils.transmitException(transmitResultUpdater.get(this))));
         }
     }
 
diff --git 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
index d62c8e7..898c113 100644
--- 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
+++ 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
@@ -30,9 +30,9 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.apache.bookkeeper.client.AsyncCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -220,12 +220,12 @@ public class BKLogSegmentEntryReader implements 
SafeRunnable, LogSegmentEntryRea
          */
         boolean checkReturnCodeAndHandleFailure(int rc, boolean isLongPoll) {
             if (BKException.Code.OK == rc) {
-                numReadErrors.set(0);
+                numReadErrorsUpdater.set(BKLogSegmentEntryReader.this, 0);
                 return true;
             }
             if (BKException.Code.BookieHandleNotAvailableException == rc
                     || (isLongPoll && 
BKException.Code.NoSuchLedgerExistsException == rc)) {
-                int numErrors = Math.max(1, numReadErrors.incrementAndGet());
+                int numErrors = Math.max(1, 
numReadErrorsUpdater.incrementAndGet(BKLogSegmentEntryReader.this));
                 int nextReadBackoffTime = Math.min(numErrors * 
readAheadWaitTime, maxReadBackoffTime);
                 scheduler.scheduleOrdered(
                         getSegment().getLogSegmentId(),
@@ -301,15 +301,21 @@ public class BKLogSegmentEntryReader implements 
SafeRunnable, LogSegmentEntryRea
     private final List<LedgerHandle> openLedgerHandles;
     private CacheEntry outstandingLongPoll;
     private long nextEntryId;
-    private final AtomicReference<Throwable> lastException = new 
AtomicReference<Throwable>(null);
-    private final AtomicLong scheduleCount = new AtomicLong(0);
+    private static final AtomicReferenceFieldUpdater<BKLogSegmentEntryReader, 
Throwable> lastExceptionUpdater =
+        AtomicReferenceFieldUpdater.newUpdater(BKLogSegmentEntryReader.class, 
Throwable.class, "lastException");
+    private volatile Throwable lastException = null;
+    private static final AtomicLongFieldUpdater<BKLogSegmentEntryReader> 
scheduleCountUpdater =
+        AtomicLongFieldUpdater.newUpdater(BKLogSegmentEntryReader.class, 
"scheduleCount");
+    private volatile long scheduleCount = 0L;
     private volatile boolean hasCaughtupOnInprogress = false;
     private final CopyOnWriteArraySet<StateChangeListener> 
stateChangeListeners =
             new CopyOnWriteArraySet<StateChangeListener>();
     // read retries
     private int readAheadWaitTime;
     private final int maxReadBackoffTime;
-    private final AtomicInteger numReadErrors = new AtomicInteger(0);
+    private static final AtomicIntegerFieldUpdater<BKLogSegmentEntryReader> 
numReadErrorsUpdater =
+        AtomicIntegerFieldUpdater.newUpdater(BKLogSegmentEntryReader.class, 
"numReadErrors");
+    private volatile int numReadErrors = 0;
     private final boolean skipBrokenEntries;
     // readahead cache
     int cachedEntries = 0;
@@ -493,8 +499,9 @@ public class BKLogSegmentEntryReader implements 
SafeRunnable, LogSegmentEntryRea
     //
 
     private boolean checkClosedOrInError() {
-        if (null != lastException.get()) {
-            cancelAllPendingReads(lastException.get());
+        Throwable cause = lastExceptionUpdater.get(this);
+        if (null != cause) {
+            cancelAllPendingReads(cause);
             return true;
         }
         return false;
@@ -507,7 +514,7 @@ public class BKLogSegmentEntryReader implements 
SafeRunnable, LogSegmentEntryRea
      * @param isBackground is the reader set exception by background reads or 
foreground reads
      */
     private void completeExceptionally(Throwable throwable, boolean 
isBackground) {
-        lastException.compareAndSet(null, throwable);
+        lastExceptionUpdater.compareAndSet(this, null, throwable);
         if (isBackground) {
             notifyReaders();
         }
@@ -662,7 +669,7 @@ public class BKLogSegmentEntryReader implements 
SafeRunnable, LogSegmentEntryRea
         final PendingReadRequest readRequest = new 
PendingReadRequest(numEntries);
 
         if (checkClosedOrInError()) {
-            readRequest.completeExceptionally(lastException.get());
+            readRequest.completeExceptionally(lastExceptionUpdater.get(this));
         } else {
             boolean wasQueueEmpty;
             synchronized (readQueue) {
@@ -682,7 +689,7 @@ public class BKLogSegmentEntryReader implements 
SafeRunnable, LogSegmentEntryRea
             return;
         }
 
-        long prevCount = scheduleCount.getAndIncrement();
+        long prevCount = scheduleCountUpdater.getAndIncrement(this);
         if (0 == prevCount) {
             scheduler.submitOrdered(getSegment().getLogSegmentId(), this);
         }
@@ -693,7 +700,7 @@ public class BKLogSegmentEntryReader implements 
SafeRunnable, LogSegmentEntryRea
      */
     @Override
     public void safeRun() {
-        long scheduleCountLocal = scheduleCount.get();
+        long scheduleCountLocal = scheduleCountUpdater.get(this);
         while (true) {
             PendingReadRequest nextRequest = null;
             synchronized (readQueue) {
@@ -702,14 +709,14 @@ public class BKLogSegmentEntryReader implements 
SafeRunnable, LogSegmentEntryRea
 
             // if read queue is empty, nothing to read, return
             if (null == nextRequest) {
-                scheduleCount.set(0L);
+                scheduleCountUpdater.set(this, 0L);
                 return;
             }
 
             // if the oldest pending promise is interrupted then we must
             // mark the reader in error and abort all pending reads since
             // we don't know the last consumed read
-            if (null == lastException.get()) {
+            if (null == lastExceptionUpdater.get(this)) {
                 if (nextRequest.getPromise().isCancelled()) {
                     completeExceptionally(new 
DLInterruptedException("Interrupted on reading log segment "
                             + getSegment() + " : " + 
nextRequest.getPromise().isCancelled()), false);
@@ -745,7 +752,7 @@ public class BKLogSegmentEntryReader implements 
SafeRunnable, LogSegmentEntryRea
                 if (0 == scheduleCountLocal) {
                     return;
                 }
-                scheduleCountLocal = scheduleCount.decrementAndGet();
+                scheduleCountLocal = 
scheduleCountUpdater.decrementAndGet(this);
             }
         }
     }
diff --git 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionStateStore.java
 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionStateStore.java
index c680000..5a56517 100644
--- 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionStateStore.java
+++ 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionStateStore.java
@@ -20,7 +20,7 @@ package org.apache.distributedlog.impl.subscription;
 import com.google.common.base.Charsets;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.distributedlog.DLSN;
 import org.apache.distributedlog.ZooKeeperClient;
@@ -42,7 +42,9 @@ public class ZKSubscriptionStateStore implements 
SubscriptionStateStore {
 
     private final ZooKeeperClient zooKeeperClient;
     private final String zkPath;
-    private AtomicReference<DLSN> lastCommittedPosition = new 
AtomicReference<DLSN>(null);
+    private static final AtomicReferenceFieldUpdater<ZKSubscriptionStateStore, 
DLSN> lastCommittedPositionUpdater =
+        AtomicReferenceFieldUpdater.newUpdater(ZKSubscriptionStateStore.class, 
DLSN.class, "lastCommittedPosition");
+    private volatile DLSN lastCommittedPosition = null;
 
     public ZKSubscriptionStateStore(ZooKeeperClient zooKeeperClient, String 
zkPath) {
         this.zooKeeperClient = zooKeeperClient;
@@ -58,8 +60,9 @@ public class ZKSubscriptionStateStore implements 
SubscriptionStateStore {
      */
     @Override
     public CompletableFuture<DLSN> getLastCommitPosition() {
-        if (null != lastCommittedPosition.get()) {
-            return FutureUtils.value(lastCommittedPosition.get());
+        DLSN dlsn = lastCommittedPositionUpdater.get(this);
+        if (null != dlsn) {
+            return FutureUtils.value(dlsn);
         } else {
             return getLastCommitPositionFromZK();
         }
@@ -105,9 +108,10 @@ public class ZKSubscriptionStateStore implements 
SubscriptionStateStore {
      */
     @Override
     public CompletableFuture<Void> advanceCommitPosition(DLSN newPosition) {
-        if (null == lastCommittedPosition.get()
-                || (newPosition.compareTo(lastCommittedPosition.get()) > 0)) {
-            lastCommittedPosition.set(newPosition);
+        DLSN dlsn = lastCommittedPositionUpdater.get(this);
+        if (null == dlsn
+                || (newPosition.compareTo(dlsn) > 0)) {
+            lastCommittedPositionUpdater.set(this, newPosition);
             return 
Utils.zkAsyncCreateFullPathOptimisticAndSetData(zooKeeperClient,
                 zkPath, newPosition.serialize().getBytes(Charsets.UTF_8),
                 zooKeeperClient.getDefaultACL(),
diff --git 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java
 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java
index 81d721b..7948084 100644
--- 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java
+++ 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java
@@ -25,6 +25,7 @@ import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
@@ -92,7 +93,9 @@ public class ZKDistributedLock implements LockListener, 
DistributedLock {
     private CompletableFuture<Void> closeFuture = null;
 
     // A counter to track how many re-acquires happened during a lock's life 
cycle.
-    private final AtomicInteger reacquireCount = new AtomicInteger(0);
+    private static final AtomicIntegerFieldUpdater<ZKDistributedLock> 
reacquireCountUpdater =
+        AtomicIntegerFieldUpdater.newUpdater(ZKDistributedLock.class, 
"reacquireCount");
+    private volatile int reacquireCount = 0;
     private final StatsLogger lockStatsLogger;
     private final OpStatsLogger acquireStats;
     private final OpStatsLogger reacquireStats;
@@ -323,7 +326,7 @@ public class ZKDistributedLock implements LockListener, 
DistributedLock {
 
     @VisibleForTesting
     int getReacquireCount() {
-        return reacquireCount.get();
+        return reacquireCountUpdater.get(this);
     }
 
     @VisibleForTesting
@@ -511,7 +514,7 @@ public class ZKDistributedLock implements LockListener, 
DistributedLock {
                 }
             });
         }
-        reacquireCount.incrementAndGet();
+        reacquireCountUpdater.incrementAndGet(this);
         internalReacquireLock(new AtomicInteger(Integer.MAX_VALUE), 0, 
lockPromise);
         return lockPromise;
     }
diff --git 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java
 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java
index f018e36..3d79336 100644
--- 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java
+++ 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java
@@ -31,7 +31,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.function.Function;
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
@@ -274,7 +274,9 @@ class ZKSessionLock implements SessionLock {
     private String currentNode;
     private String watchedNode;
     private LockWatcher watcher;
-    private final AtomicInteger epoch = new AtomicInteger(0);
+    private static final AtomicIntegerFieldUpdater<ZKSessionLock> epochUpdater 
=
+        AtomicIntegerFieldUpdater.newUpdater(ZKSessionLock.class, "epoch");
+    private volatile int epoch = 0;
     private final OrderedScheduler lockStateExecutor;
     private LockListener lockListener = null;
     private final long lockOpTimeout;
@@ -358,9 +360,8 @@ class ZKSessionLock implements SessionLock {
         return this.lockPath;
     }
 
-    @VisibleForTesting
-    AtomicInteger getEpoch() {
-        return epoch;
+    int getEpoch() {
+        return epochUpdater.get(this);
     }
 
     @VisibleForTesting
@@ -394,7 +395,7 @@ class ZKSessionLock implements SessionLock {
         lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() {
             @Override
             public void safeRun() {
-                if (ZKSessionLock.this.epoch.get() == lockEpoch) {
+                if (getEpoch() == lockEpoch) {
                     if (LOG.isTraceEnabled()) {
                         LOG.trace("{} executing lock action '{}' under epoch 
{} for lock {}",
                                 new Object[]{lockId, func.getActionName(), 
lockEpoch, lockPath});
@@ -409,7 +410,7 @@ class ZKSessionLock implements SessionLock {
                         LOG.trace("{} skipped executing lock action '{}' for 
lock {},"
                                         + " since epoch is changed from {} to 
{}.",
                                 new Object[]{lockId, func.getActionName(),
-                                        lockPath, lockEpoch, 
ZKSessionLock.this.epoch.get()});
+                                        lockPath, lockEpoch, getEpoch()});
                     }
                 }
             }
@@ -433,7 +434,7 @@ class ZKSessionLock implements SessionLock {
         lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() {
             @Override
             public void safeRun() {
-                int currentEpoch = ZKSessionLock.this.epoch.get();
+                int currentEpoch = getEpoch();
                 if (currentEpoch == lockEpoch) {
                     if (LOG.isTraceEnabled()) {
                         LOG.trace("{} executed lock action '{}' under epoch {} 
for lock {}",
@@ -656,7 +657,7 @@ class ZKSessionLock implements SessionLock {
             return false;
         }
         // current owner is itself
-        final int curEpoch = epoch.incrementAndGet();
+        final int curEpoch = epochUpdater.incrementAndGet(this);
         executeLockAction(curEpoch, new LockAction() {
             @Override
             public void execute() {
@@ -736,7 +737,7 @@ class ZKSessionLock implements SessionLock {
      *          promise to satisfy with current lock owner.
      */
     private void asyncTryLockWithoutCleanup(final boolean wait, final 
CompletableFuture<String> promise) {
-        executeLockAction(epoch.get(), new LockAction() {
+        executeLockAction(getEpoch(), new LockAction() {
             @Override
             public void execute() {
                 if (!lockState.inState(State.INIT)) {
@@ -746,7 +747,7 @@ class ZKSessionLock implements SessionLock {
                 }
                 lockState.transition(State.PREPARING);
 
-                final int curEpoch = epoch.incrementAndGet();
+                final int curEpoch = 
epochUpdater.incrementAndGet(ZKSessionLock.this);
                 watcher = new LockWatcher(curEpoch);
                 // register watcher for session expires
                 zkClient.register(watcher);
@@ -913,7 +914,7 @@ class ZKSessionLock implements SessionLock {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Notify lock waiters on {} at {} : watcher epoch {}, 
lock epoch {}",
                     new Object[] { lockPath, System.currentTimeMillis(),
-                            lockEpoch, ZKSessionLock.this.epoch.get() });
+                            lockEpoch, getEpoch() });
         }
         acquireFuture.complete(true);
     }
@@ -924,7 +925,7 @@ class ZKSessionLock implements SessionLock {
     private void unlockInternal(final CompletableFuture<Void> promise) {
 
         // already closed or expired, nothing to cleanup
-        this.epoch.incrementAndGet();
+        this.epochUpdater.incrementAndGet(this);
         if (null != watcher) {
             this.zkClient.unregister(watcher);
         }
@@ -1026,7 +1027,7 @@ class ZKSessionLock implements SessionLock {
                 }
 
                 // increment epoch to avoid any ongoing locking action
-                ZKSessionLock.this.epoch.incrementAndGet();
+                epochUpdater.incrementAndGet(ZKSessionLock.this);
 
                 // if session expired, just notify the waiter. as the lock 
acquire doesn't succeed.
                 // we don't even need to clean up the lock as the znode will 
disappear after session expired
@@ -1326,7 +1327,7 @@ class ZKSessionLock implements SessionLock {
         @Override
         public void process(WatchedEvent event) {
             LOG.debug("Received event {} from lock {} at {} : watcher epoch 
{}, lock epoch {}.",
-                    new Object[] {event, lockPath, System.currentTimeMillis(), 
epoch, ZKSessionLock.this.epoch.get() });
+                    new Object[] {event, lockPath, System.currentTimeMillis(), 
epoch, getEpoch() });
             if (event.getType() == Watcher.Event.EventType.None) {
                 switch (event.getState()) {
                     case SyncConnected:
@@ -1334,7 +1335,7 @@ class ZKSessionLock implements SessionLock {
                     case Expired:
                         LOG.info("Session {} is expired for lock {} at {} : 
watcher epoch {}, lock epoch {}.",
                                 new Object[] { lockId.getRight(), lockPath, 
System.currentTimeMillis(),
-                                        epoch, ZKSessionLock.this.epoch.get() 
});
+                                        epoch, getEpoch() });
                         handleSessionExpired(epoch);
                         break;
                     default:
diff --git 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java
 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java
index b5bc499..5a076e3 100644
--- 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java
+++ 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java
@@ -18,7 +18,7 @@
 package org.apache.distributedlog.util;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
@@ -41,7 +41,9 @@ public class SimplePermitLimiter implements PermitLimiter {
 
     final Counter acquireFailureCounter;
     final OpStatsLogger permitsMetric;
-    final AtomicInteger permits;
+    private static final AtomicIntegerFieldUpdater<SimplePermitLimiter> 
permitsUpdater =
+        AtomicIntegerFieldUpdater.newUpdater(SimplePermitLimiter.class, 
"permits");
+    volatile int permits = 0;
     final int permitsMax;
     final boolean darkmode;
     final Feature disableWriteLimitFeature;
@@ -51,7 +53,6 @@ public class SimplePermitLimiter implements PermitLimiter {
 
     public SimplePermitLimiter(boolean darkmode, int permitsMax, StatsLogger 
statsLogger,
                                boolean singleton, Feature 
disableWriteLimitFeature) {
-        this.permits = new AtomicInteger(0);
         this.permitsMax = permitsMax;
         this.darkmode = darkmode;
         this.disableWriteLimitFeature = disableWriteLimitFeature;
@@ -66,7 +67,7 @@ public class SimplePermitLimiter implements PermitLimiter {
                 }
                 @Override
                 public Number getSample() {
-                    return permits.get();
+                    return permitsUpdater.get(SimplePermitLimiter.this);
                 }
             };
             this.permitsGaugeLabel = "permits";
@@ -82,19 +83,19 @@ public class SimplePermitLimiter implements PermitLimiter {
 
     @Override
     public boolean acquire() {
-        permitsMetric.registerSuccessfulValue(permits.get());
-        if (permits.incrementAndGet() <= permitsMax || isDarkmode()) {
+        permitsMetric.registerSuccessfulValue(permitsUpdater.get(this));
+        if (permitsUpdater.incrementAndGet(this) <= permitsMax || 
isDarkmode()) {
             return true;
         } else {
             acquireFailureCounter.inc();
-            permits.decrementAndGet();
+            permitsUpdater.decrementAndGet(this);
             return false;
         }
     }
 
     @Override
     public void release(int permitsToRelease) {
-        permits.addAndGet(-permitsToRelease);
+        permitsUpdater.addAndGet(this, -permitsToRelease);
     }
 
     @Override
@@ -104,7 +105,7 @@ public class SimplePermitLimiter implements PermitLimiter {
 
     @VisibleForTesting
     public int getPermits() {
-        return permits.get();
+        return permitsUpdater.get(this);
     }
 
     public void unregisterGauge() {
diff --git 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/LimitedPermitManager.java
 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/LimitedPermitManager.java
index bd731a6..211d10c 100644
--- 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/LimitedPermitManager.java
+++ 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/LimitedPermitManager.java
@@ -21,7 +21,7 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -44,14 +44,14 @@ public class LimitedPermitManager implements PermitManager, 
Runnable, Watcher {
         ALLOWED, DISALLOWED, DISABLED
     }
 
-    class EpochPermit implements Permit {
+    static class EpochPermit implements Permit {
 
         final PermitState state;
         final int epoch;
 
-        EpochPermit(PermitState state) {
+        EpochPermit(PermitState state, int epoch) {
             this.state = state;
-            this.epoch = LimitedPermitManager.this.epoch.get();
+            this.epoch = epoch;
         }
 
         int getEpoch() {
@@ -69,7 +69,9 @@ public class LimitedPermitManager implements PermitManager, 
Runnable, Watcher {
     final int period;
     final TimeUnit timeUnit;
     final ScheduledExecutorService executorService;
-    final AtomicInteger epoch = new AtomicInteger(0);
+    private static final AtomicIntegerFieldUpdater<LimitedPermitManager> 
epochUpdater =
+        AtomicIntegerFieldUpdater.newUpdater(LimitedPermitManager.class, 
"epoch");
+    volatile int epoch = 0;
     private StatsLogger statsLogger = null;
     private Gauge<Number> outstandingGauge = null;
 
@@ -106,13 +108,13 @@ public class LimitedPermitManager implements 
PermitManager, Runnable, Watcher {
     @Override
     public synchronized Permit acquirePermit() {
         if (!enablePermits) {
-            return new EpochPermit(PermitState.DISABLED);
+            return new EpochPermit(PermitState.DISABLED, 
epochUpdater.get(this));
         }
         if (null != semaphore) {
-            return semaphore.tryAcquire() ? new 
EpochPermit(PermitState.ALLOWED) :
-                    new EpochPermit(PermitState.DISALLOWED);
+            return semaphore.tryAcquire() ? new 
EpochPermit(PermitState.ALLOWED, epochUpdater.get(this)) :
+                    new EpochPermit(PermitState.DISALLOWED, 
epochUpdater.get(this));
         } else {
-            return new EpochPermit(PermitState.ALLOWED);
+            return new EpochPermit(PermitState.ALLOWED, 
epochUpdater.get(this));
         }
     }
 
@@ -138,9 +140,10 @@ public class LimitedPermitManager implements 
PermitManager, Runnable, Watcher {
         if (!(permit instanceof EpochPermit)) {
             return false;
         }
-        if (epoch.getAndIncrement() == ((EpochPermit) permit).getEpoch()) {
+        int epoch = epochUpdater.getAndIncrement(this);
+        if (epoch == ((EpochPermit) permit).getEpoch()) {
             this.enablePermits = false;
-            LOG.info("EnablePermits = {}, Epoch = {}.", this.enablePermits, 
epoch.get());
+            LOG.info("EnablePermits = {}, Epoch = {}.", this.enablePermits, 
epoch);
             return true;
         } else {
             return false;
@@ -159,9 +162,9 @@ public class LimitedPermitManager implements PermitManager, 
Runnable, Watcher {
     }
 
     synchronized void forceSetAllowPermits(boolean allowPermits) {
-        epoch.getAndIncrement();
+        int epoch = epochUpdater.getAndIncrement(this);
         this.enablePermits = allowPermits;
-        LOG.info("EnablePermits = {}, Epoch = {}.", this.enablePermits, 
epoch.get());
+        LOG.info("EnablePermits = {}, Epoch = {}.", this.enablePermits, epoch);
     }
 
     @Override
diff --git 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/ZKWatcherManager.java
 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/ZKWatcherManager.java
index fe8eed4..8c350c9 100644
--- 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/ZKWatcherManager.java
+++ 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/ZKWatcherManager.java
@@ -21,7 +21,7 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.distributedlog.ZooKeeperClient;
@@ -89,7 +89,7 @@ public class ZKWatcherManager implements Watcher {
     private static final String numChildWatchesGauageLabel = 
"num_child_watches";
 
     protected final ConcurrentMap<String, Set<Watcher>> childWatches;
-    protected final AtomicInteger allWatchesGauge;
+    protected final LongAdder allWatchesGauge;
 
     private ZKWatcherManager(String name,
                              ZooKeeperClient zkc,
@@ -100,7 +100,7 @@ public class ZKWatcherManager implements Watcher {
 
         // watches
         this.childWatches = new ConcurrentHashMap<String, Set<Watcher>>();
-        this.allWatchesGauge = new AtomicInteger(0);
+        this.allWatchesGauge = new LongAdder();
 
         // stats
         totalWatchesGauge = new Gauge<Number>() {
@@ -111,7 +111,7 @@ public class ZKWatcherManager implements Watcher {
 
             @Override
             public Number getSample() {
-                return allWatchesGauge.get();
+                return allWatchesGauge.sum();
             }
         };
         this.statsLogger.registerGauge(totalWatchesGauageLabel, 
totalWatchesGauge);
@@ -141,7 +141,7 @@ public class ZKWatcherManager implements Watcher {
         synchronized (watchers) {
             if (childWatches.get(path) == watchers) {
                 if (watchers.add(watcher)) {
-                    allWatchesGauge.incrementAndGet();
+                    allWatchesGauge.increment();
                 }
             } else {
                 logger.warn("Watcher set for path {} has been changed while 
registering child watcher {}.",
@@ -160,7 +160,7 @@ public class ZKWatcherManager implements Watcher {
         }
         synchronized (watchers) {
             if (watchers.remove(watcher)) {
-                allWatchesGauge.decrementAndGet();
+                allWatchesGauge.decrement();
             } else {
                 logger.warn("Remove a non-registered child watcher {} from 
path {}", watcher, path);
             }
@@ -212,7 +212,7 @@ public class ZKWatcherManager implements Watcher {
     }
 
     private void handleKeeperStateEvent(WatchedEvent event) {
-        Set<Watcher> savedAllWatches = new 
HashSet<Watcher>(allWatchesGauge.get());
+        Set<Watcher> savedAllWatches = new HashSet<Watcher>((int) 
allWatchesGauge.sum());
         for (Set<Watcher> watcherSet : childWatches.values()) {
             synchronized (watcherSet) {
                 savedAllWatches.addAll(watcherSet);
diff --git 
a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java
 
b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java
index ad1ebc1..901d2a0 100644
--- 
a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java
+++ 
b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java
@@ -221,7 +221,7 @@ public class TestZKSessionLock extends 
ZooKeeperClusterTestCase {
 
         // lock action would be executed in same epoch
         final CountDownLatch latch1 = new CountDownLatch(1);
-        lock.executeLockAction(lock.getEpoch().get(), new LockAction() {
+        lock.executeLockAction(lock.getEpoch(), new LockAction() {
             @Override
             public void execute() {
                 counter.incrementAndGet();
@@ -238,7 +238,7 @@ public class TestZKSessionLock extends 
ZooKeeperClusterTestCase {
 
         // lock action would not be executed in same epoch
         final CountDownLatch latch2 = new CountDownLatch(1);
-        lock.executeLockAction(lock.getEpoch().get() + 1, new LockAction() {
+        lock.executeLockAction(lock.getEpoch() + 1, new LockAction() {
             @Override
             public void execute() {
                 counter.incrementAndGet();
@@ -249,7 +249,7 @@ public class TestZKSessionLock extends 
ZooKeeperClusterTestCase {
                 return "increment2";
             }
         });
-        lock.executeLockAction(lock.getEpoch().get(), new LockAction() {
+        lock.executeLockAction(lock.getEpoch(), new LockAction() {
             @Override
             public void execute() {
                 latch2.countDown();
@@ -265,7 +265,7 @@ public class TestZKSessionLock extends 
ZooKeeperClusterTestCase {
 
         // lock action would not be executed in same epoch and promise would 
be satisfied with exception
         CompletableFuture<Void> promise = new CompletableFuture<Void>();
-        lock.executeLockAction(lock.getEpoch().get() + 1, new LockAction() {
+        lock.executeLockAction(lock.getEpoch() + 1, new LockAction() {
             @Override
             public void execute() {
                 counter.incrementAndGet();
diff --git 
a/stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLOutputStream.java
 
b/stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLOutputStream.java
index e6d076e..b0b5bb1 100644
--- 
a/stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLOutputStream.java
+++ 
b/stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLOutputStream.java
@@ -25,7 +25,7 @@ import io.netty.buffer.Unpooled;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
@@ -52,7 +52,9 @@ class DLOutputStream extends OutputStream {
     private long writePos = 0L;
 
     // state
-    private final AtomicReference<Throwable> exception = new 
AtomicReference<>(null);
+    private static final AtomicReferenceFieldUpdater<DLOutputStream, 
Throwable> exceptionUpdater =
+        AtomicReferenceFieldUpdater.newUpdater(DLOutputStream.class, 
Throwable.class, "exception");
+    private volatile Throwable exception = null;
 
     DLOutputStream(DistributedLogManager dlm,
                    AsyncLogWriter writer) {
@@ -83,7 +85,7 @@ class DLOutputStream extends OutputStream {
     }
 
     private synchronized void write(ByteBuf buf) throws IOException {
-        Throwable cause = exception.get();
+        Throwable cause = exceptionUpdater.get(this);
         if (null != cause) {
             if (cause instanceof IOException) {
                 throw (IOException) cause;
@@ -104,7 +106,7 @@ class DLOutputStream extends OutputStream {
 
             @Override
             public void onFailure(Throwable cause) {
-                exception.compareAndSet(null, cause);
+                exceptionUpdater.compareAndSet(DLOutputStream.this, null, 
cause);
             }
         });
     }

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to