This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new db713f3 [DLOG] use Atomic***FieldUpdater and LongAdder if possible
db713f3 is described below
commit db713f32c24cc9345d8b813aeef9d079b254d16a
Author: Sijie Guo <[email protected]>
AuthorDate: Wed Mar 28 22:17:00 2018 -0700
[DLOG] use Atomic***FieldUpdater and LongAdder if possible
Descriptions of the changes in this PR:
AtomicFieldUpdater + volatile provides similar guarantees as Atomic but use
much fewer memory.
In dlog, when handling large number of streams, there can be a lot of
Atomic fields with `LogHandler`, `SegmentWriter` and such.
Switching using Atomic to using AtomicFieldUpdater + volatile will save a
lot of memory. See
[details](http://normanmaurer.me/blog/2013/10/28/Lesser-known-concurrent-classes-Part-1/)
LongAdder is less contended across threads, which is more preferable to
AtomicLong when multiple threads update a common sum. E.g.
SampledMovingAverageRate.
Switching to use LongAdder if AtomicLong is not needed. See
[details](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/LongAdder.html)
Master Issue: #<master-issue-number>
Author: Sijie Guo <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Jia Zhai
<[email protected]>, Philip Su <[email protected]>
This closes #1299 from sijie/use_atomic_field_updater
---
.../common/rate/SampledMovingAverageRate.java | 10 +-
.../apache/distributedlog/BKAsyncLogReader.java | 44 +++---
.../org/apache/distributedlog/BKLogHandler.java | 13 +-
.../apache/distributedlog/BKLogReadHandler.java | 4 +-
.../apache/distributedlog/BKLogSegmentWriter.java | 151 ++++++++++++---------
.../impl/logsegment/BKLogSegmentEntryReader.java | 41 +++---
.../subscription/ZKSubscriptionStateStore.java | 18 ++-
.../distributedlog/lock/ZKDistributedLock.java | 9 +-
.../apache/distributedlog/lock/ZKSessionLock.java | 33 ++---
.../distributedlog/util/SimplePermitLimiter.java | 19 +--
.../distributedlog/zk/LimitedPermitManager.java | 29 ++--
.../apache/distributedlog/zk/ZKWatcherManager.java | 14 +-
.../distributedlog/lock/TestZKSessionLock.java | 8 +-
.../apache/distributedlog/fs/DLOutputStream.java | 10 +-
14 files changed, 226 insertions(+), 177 deletions(-)
diff --git
a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/rate/SampledMovingAverageRate.java
b/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/rate/SampledMovingAverageRate.java
index 2c89d64..d8c72af 100644
---
a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/rate/SampledMovingAverageRate.java
+++
b/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/rate/SampledMovingAverageRate.java
@@ -20,7 +20,7 @@ package org.apache.distributedlog.common.rate;
import com.google.common.base.Ticker;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.lang3.tuple.Pair;
/**
@@ -30,7 +30,7 @@ class SampledMovingAverageRate implements MovingAverageRate {
private static final long NANOS_PER_SEC = TimeUnit.SECONDS.toNanos(1);
- private final AtomicLong total;
+ private final LongAdder total;
private final Ticker ticker;
private final double scaleFactor;
private final LinkedBlockingDeque<Pair<Long, Long>> samples;
@@ -45,7 +45,7 @@ class SampledMovingAverageRate implements MovingAverageRate {
double scaleFactor,
Ticker ticker) {
this.value = 0;
- this.total = new AtomicLong(0);
+ this.total = new LongAdder();
this.scaleFactor = scaleFactor;
this.ticker = ticker;
this.samples = new LinkedBlockingDeque<>(intervalSecs);
@@ -58,7 +58,7 @@ class SampledMovingAverageRate implements MovingAverageRate {
@Override
public void add(long amount) {
- total.getAndAdd(amount);
+ total.add(amount);
}
@Override
@@ -71,7 +71,7 @@ class SampledMovingAverageRate implements MovingAverageRate {
}
private double doSample() {
- long newSample = total.get();
+ long newSample = total.sum();
long newTimestamp = ticker.read();
double rate = 0;
diff --git
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
index 3373896..6adcbc4 100644
---
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
+++
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
@@ -28,8 +28,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
@@ -77,12 +77,16 @@ class BKAsyncLogReader implements AsyncLogReader,
SafeRunnable, AsyncNotificatio
private final String streamName;
protected final BKDistributedLogManager bkDistributedLogManager;
protected final BKLogReadHandler readHandler;
- private final AtomicReference<Throwable> lastException = new
AtomicReference<Throwable>();
+ private static final AtomicReferenceFieldUpdater<BKAsyncLogReader,
Throwable> lastExceptionUpdater =
+ AtomicReferenceFieldUpdater.newUpdater(BKAsyncLogReader.class,
Throwable.class, "lastException");
+ private volatile Throwable lastException = null;
private final OrderedScheduler scheduler;
private final ConcurrentLinkedQueue<PendingReadRequest> pendingRequests =
new ConcurrentLinkedQueue<PendingReadRequest>();
private final Object scheduleLock = new Object();
- private final AtomicLong scheduleCount = new AtomicLong(0);
+ private static final AtomicLongFieldUpdater<BKAsyncLogReader>
scheduleCountUpdater =
+ AtomicLongFieldUpdater.newUpdater(BKAsyncLogReader.class,
"scheduleCount");
+ private volatile long scheduleCount = 0L;
private final Stopwatch scheduleDelayStopwatch;
private final Stopwatch readNextDelayStopwatch;
private DLSN startDLSN;
@@ -340,7 +344,7 @@ class BKAsyncLogReader implements AsyncLogReader,
SafeRunnable, AsyncNotificatio
}
private boolean checkClosedOrInError(String operation) {
- if (null == lastException.get()) {
+ if (null == lastExceptionUpdater.get(this)) {
try {
if (null != readHandler && null != getReadAheadReader()) {
getReadAheadReader().checkLastException();
@@ -360,9 +364,10 @@ class BKAsyncLogReader implements AsyncLogReader,
SafeRunnable, AsyncNotificatio
}
}
- if (null != lastException.get()) {
+ Throwable cause = lastExceptionUpdater.get(this);
+ if (null != cause) {
LOG.trace("Cancelling pending reads");
- cancelAllPendingReads(lastException.get());
+ cancelAllPendingReads(cause);
return true;
}
@@ -370,7 +375,7 @@ class BKAsyncLogReader implements AsyncLogReader,
SafeRunnable, AsyncNotificatio
}
private void setLastException(IOException exc) {
- lastException.compareAndSet(null, exc);
+ lastExceptionUpdater.compareAndSet(this, null, exc);
}
@Override
@@ -446,7 +451,7 @@ class BKAsyncLogReader implements AsyncLogReader,
SafeRunnable, AsyncNotificatio
}
if (checkClosedOrInError("readNext")) {
- readRequest.completeExceptionally(lastException.get());
+ readRequest.completeExceptionally(lastExceptionUpdater.get(this));
} else {
boolean queueEmpty = pendingRequests.isEmpty();
pendingRequests.add(readRequest);
@@ -469,7 +474,7 @@ class BKAsyncLogReader implements AsyncLogReader,
SafeRunnable, AsyncNotificatio
return;
}
- long prevCount = scheduleCount.getAndIncrement();
+ long prevCount = scheduleCountUpdater.getAndIncrement(this);
if (0 == prevCount) {
scheduleDelayStopwatch.reset().start();
scheduler.submitOrdered(streamName, this);
@@ -574,7 +579,7 @@ class BKAsyncLogReader implements AsyncLogReader,
SafeRunnable, AsyncNotificatio
Stopwatch runTime = Stopwatch.createStarted();
int iterations = 0;
- long scheduleCountLocal = scheduleCount.get();
+ long scheduleCountLocal = scheduleCountUpdater.get(this);
LOG.debug("{}: Scheduled Background Reader",
readHandler.getFullyQualifiedName());
while (true) {
if (LOG.isTraceEnabled()) {
@@ -588,7 +593,7 @@ class BKAsyncLogReader implements AsyncLogReader,
SafeRunnable, AsyncNotificatio
// Queue is empty, nothing to read, return
if (null == nextRequest) {
LOG.trace("{}: Queue Empty waiting for Input",
readHandler.getFullyQualifiedName());
- scheduleCount.set(0);
+ scheduleCountUpdater.set(this, 0);
backgroundReaderRunTime.registerSuccessfulEvent(
runTime.stop().elapsed(TimeUnit.MICROSECONDS),
TimeUnit.MICROSECONDS);
return;
@@ -605,7 +610,7 @@ class BKAsyncLogReader implements AsyncLogReader,
SafeRunnable, AsyncNotificatio
// If the oldest pending promise is interrupted then we must
mark
// the reader in error and abort all pending reads since we
dont
// know the last consumed read
- if (null == lastException.get()) {
+ if (null == lastExceptionUpdater.get(this)) {
if (nextRequest.getPromise().isCancelled()) {
setLastException(new
DLInterruptedException("Interrupted on reading "
+ readHandler.getFullyQualifiedName()));
@@ -613,8 +618,9 @@ class BKAsyncLogReader implements AsyncLogReader,
SafeRunnable, AsyncNotificatio
}
if (checkClosedOrInError("readNext")) {
- if (!(lastException.get().getCause() instanceof
LogNotFoundException)) {
- LOG.warn("{}: Exception",
readHandler.getFullyQualifiedName(), lastException.get());
+ Throwable lastException = lastExceptionUpdater.get(this);
+ if (lastException != null && !(lastException.getCause()
instanceof LogNotFoundException)) {
+ LOG.warn("{}: Exception",
readHandler.getFullyQualifiedName(), lastException);
}
backgroundReaderRunTime.registerFailedEvent(
runTime.stop().elapsed(TimeUnit.MICROSECONDS),
TimeUnit.MICROSECONDS);
@@ -659,7 +665,7 @@ class BKAsyncLogReader implements AsyncLogReader,
SafeRunnable, AsyncNotificatio
setLastException(exc);
if (!(exc instanceof LogNotFoundException)) {
LOG.warn("{} : read with skip Exception",
- readHandler.getFullyQualifiedName(),
lastException.get());
+ readHandler.getFullyQualifiedName(),
lastExceptionUpdater.get(this));
}
continue;
}
@@ -670,7 +676,7 @@ class BKAsyncLogReader implements AsyncLogReader,
SafeRunnable, AsyncNotificatio
backgroundReaderRunTime.registerSuccessfulEvent(
runTime.stop().elapsed(TimeUnit.MICROSECONDS),
TimeUnit.MICROSECONDS);
scheduleDelayStopwatch.reset().start();
- scheduleCount.set(0);
+ scheduleCountUpdater.set(this, 0);
// the request could still wait for more records
backgroundScheduleTask = scheduler.scheduleOrdered(
streamName,
@@ -702,12 +708,12 @@ class BKAsyncLogReader implements AsyncLogReader,
SafeRunnable, AsyncNotificatio
}
} else {
if (0 == scheduleCountLocal) {
- LOG.trace("Schedule count dropping to zero",
lastException.get());
+ LOG.trace("Schedule count dropping to zero",
lastExceptionUpdater.get(this));
backgroundReaderRunTime.registerSuccessfulEvent(
runTime.stop().elapsed(TimeUnit.MICROSECONDS),
TimeUnit.MICROSECONDS);
return;
}
- scheduleCountLocal = scheduleCount.decrementAndGet();
+ scheduleCountLocal =
scheduleCountUpdater.decrementAndGet(this);
}
}
}
diff --git
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogHandler.java
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogHandler.java
index afcd7bb..a319d44 100644
---
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogHandler.java
+++
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogHandler.java
@@ -31,7 +31,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
@@ -97,7 +97,9 @@ abstract class BKLogHandler implements AsyncCloseable,
AsyncAbortable {
protected final AlertStatsLogger alertStatsLogger;
protected volatile boolean reportGetSegmentStats = false;
private final String lockClientId;
- protected final AtomicReference<IOException> metadataException = new
AtomicReference<IOException>(null);
+ protected static final AtomicReferenceFieldUpdater<BKLogHandler,
IOException> METADATA_EXCEPTION_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(BKLogHandler.class,
IOException.class, "metadataException");
+ private volatile IOException metadataException = null;
// Maintain the list of log segments per stream
protected final PerStreamLogSegmentCache logSegmentCache;
@@ -155,8 +157,9 @@ abstract class BKLogHandler implements AsyncCloseable,
AsyncAbortable {
}
BKLogHandler checkMetadataException() throws IOException {
- if (null != metadataException.get()) {
- throw metadataException.get();
+ IOException ioe = METADATA_EXCEPTION_UPDATER.get(this);
+ if (null != ioe) {
+ throw ioe;
}
return this;
}
@@ -480,7 +483,7 @@ abstract class BKLogHandler implements AsyncCloseable,
AsyncAbortable {
// the log segments cache went wrong
LOG.error("Unexpected exception on getting log segments from the
cache for stream {}",
getFullyQualifiedName(), ue);
- metadataException.compareAndSet(null, ue);
+ METADATA_EXCEPTION_UPDATER.compareAndSet(this, null, ue);
throw ue;
}
}
diff --git
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java
index 66ca2de..1175f39 100644
---
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java
+++
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java
@@ -272,7 +272,7 @@ class BKLogReadHandler extends BKLogHandler implements
LogSegmentNamesListener {
|| cause instanceof LogSegmentNotFoundException
|| cause instanceof UnexpectedException) {
// indicate some inconsistent behavior, abort
- metadataException.compareAndSet(null, (IOException) cause);
+
METADATA_EXCEPTION_UPDATER.compareAndSet(BKLogReadHandler.this, null,
(IOException) cause);
// notify the reader that read handler is in error state
notifyReaderOnError(cause);
FutureUtils.completeExceptionally(promise, cause);
@@ -318,7 +318,7 @@ class BKLogReadHandler extends BKLogHandler implements
LogSegmentNamesListener {
|| cause instanceof LogSegmentNotFoundException
|| cause instanceof UnexpectedException) {
// indicate some inconsistent behavior, abort
- metadataException.compareAndSet(null, (IOException) cause);
+
METADATA_EXCEPTION_UPDATER.compareAndSet(BKLogReadHandler.this, null,
(IOException) cause);
// notify the reader that read handler is in error state
notifyReaderOnError(cause);
return;
diff --git
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
index 6263ddf..4ad977b 100644
---
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
+++
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
@@ -34,8 +34,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
@@ -53,6 +53,7 @@ import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.MathUtils;
+import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.distributedlog.Entry.Writer;
import org.apache.distributedlog.common.stats.OpStatsListener;
import org.apache.distributedlog.common.util.PermitLimiter;
@@ -158,13 +159,16 @@ class BKLogSegmentWriter implements LogSegmentWriter,
AddCallback, Runnable, Siz
private final int logSegmentMetadataVersion;
private BKTransmitPacket packetPrevious;
private Entry.Writer recordSetWriter;
- private final AtomicInteger outstandingTransmits;
+ private static final AtomicIntegerFieldUpdater<BKLogSegmentWriter>
outstandingTransmitsUpdater =
+ AtomicIntegerFieldUpdater.newUpdater(BKLogSegmentWriter.class,
"outstandingTransmits");
+ private volatile int outstandingTransmits = 0;
private final int transmissionThreshold;
protected final LogSegmentEntryWriter entryWriter;
private final CompressionCodec.Type compressionType;
private final ReentrantLock transmitLock = new ReentrantLock();
- private final AtomicInteger transmitResult =
- new AtomicInteger(BKException.Code.OK);
+ private static final AtomicIntegerFieldUpdater<BKLogSegmentWriter>
transmitResultUpdater =
+ AtomicIntegerFieldUpdater.newUpdater(BKLogSegmentWriter.class,
"transmitResult");
+ private volatile int transmitResult = BKException.Code.OK;
private final DistributedLock lock;
private final boolean isDurableWriteEnabled;
private DLSN lastDLSN = DLSN.InvalidDLSN;
@@ -188,11 +192,24 @@ class BKLogSegmentWriter implements LogSegmentWriter,
AddCallback, Runnable, Siz
private boolean streamEnded = false;
private final ScheduledFuture<?> periodicFlushSchedule;
private final ScheduledFuture<?> periodicKeepAliveSchedule;
- private final AtomicReference<ScheduledFuture<?>> transmitSchedFutureRef =
- new AtomicReference<ScheduledFuture<?>>(null);
- private final AtomicReference<ScheduledFuture<?>> immFlushSchedFutureRef =
- new AtomicReference<ScheduledFuture<?>>(null);
- private final AtomicReference<Exception> scheduledFlushException = new
AtomicReference<Exception>(null);
+ private static final AtomicReferenceFieldUpdater<BKLogSegmentWriter,
ScheduledFuture>
+ transmitSchedFutureRefUpdater = AtomicReferenceFieldUpdater.newUpdater(
+ BKLogSegmentWriter.class,
+ ScheduledFuture.class,
+ "transmitSchedFutureRef");
+ private volatile ScheduledFuture transmitSchedFutureRef = null;
+ private static final AtomicReferenceFieldUpdater<BKLogSegmentWriter,
ScheduledFuture>
+ immFlushSchedFutureRefUpdater = AtomicReferenceFieldUpdater.newUpdater(
+ BKLogSegmentWriter.class,
+ ScheduledFuture.class,
+ "immFlushSchedFutureRef");
+ private volatile ScheduledFuture immFlushSchedFutureRef = null;
+ private static final AtomicReferenceFieldUpdater<BKLogSegmentWriter,
Exception> scheduledFlushExceptionUpdater =
+ AtomicReferenceFieldUpdater.newUpdater(
+ BKLogSegmentWriter.class,
+ Exception.class,
+ "scheduledFlushException");
+ private volatile Exception scheduledFlushException = null;
private boolean enforceLock = true;
private CompletableFuture<Void> closeFuture = null;
private final boolean enableRecordCounts;
@@ -300,12 +317,11 @@ class BKLogSegmentWriter implements LogSegmentWriter,
AddCallback, Runnable, Siz
}
@Override
public Number getSample() {
- return outstandingTransmits.get();
+ return
outstandingTransmitsUpdater.get(BKLogSegmentWriter.this);
}
};
transmitOutstandingLogger.registerGauge("requests",
transmitOutstandingGauge);
- outstandingTransmits = new AtomicInteger(0);
this.fullyQualifiedLogSegment = streamName + ":" + logSegmentName;
this.streamName = streamName;
this.logSegmentMetadataVersion = logSegmentMetadataVersion;
@@ -395,7 +411,7 @@ class BKLogSegmentWriter implements LogSegmentWriter,
AddCallback, Runnable, Siz
@VisibleForTesting
void setTransmitResult(int rc) {
- transmitResult.set(rc);
+ transmitResultUpdater.set(this, rc);
}
@VisibleForTesting
@@ -510,7 +526,7 @@ class BKLogSegmentWriter implements LogSegmentWriter,
AddCallback, Runnable, Siz
if (null != packet) {
EntryBuffer recordSet = packet.getRecordSet();
numRecords = recordSet.getNumRecords();
- int rc = transmitResult.get();
+ int rc = transmitResultUpdater.get(this);
if (BKException.Code.OK == rc) {
rc = BKException.Code.InterruptedException;
}
@@ -537,13 +553,13 @@ class BKLogSegmentWriter implements LogSegmentWriter,
AddCallback, Runnable, Siz
closePromise = closeFuture = new CompletableFuture<Void>();
}
- AtomicReference<Throwable> throwExc = new
AtomicReference<Throwable>(null);
+ MutableObject<Throwable> throwExc = new MutableObject<>(null);
closeInternal(abort, throwExc, closePromise);
return closePromise;
}
private void closeInternal(final boolean abort,
- final AtomicReference<Throwable> throwExc,
+ final MutableObject<Throwable> throwExc,
final CompletableFuture<Void> closePromise) {
// clean stats resources
this.transmitOutstandingLogger.unregisterGauge("requests",
transmitOutstandingGauge);
@@ -579,7 +595,7 @@ class BKLogSegmentWriter implements LogSegmentWriter,
AddCallback, Runnable, Siz
@Override
public void onFailure(Throwable cause) {
- throwExc.set(cause);
+ throwExc.setValue(cause);
abortTransmitPacketOnClose(abort, throwExc, closePromise);
}
});
@@ -590,12 +606,12 @@ class BKLogSegmentWriter implements LogSegmentWriter,
AddCallback, Runnable, Siz
}
private void abortTransmitPacketOnClose(final boolean abort,
- final AtomicReference<Throwable>
throwExc,
+ final MutableObject<Throwable>
throwExc,
final CompletableFuture<Void>
closePromise) {
LOG.info("Closing BKPerStreamLogWriter (abort={}) for {} :"
+ " lastDLSN = {} outstandingTransmits = {}
writesPendingTransmit = {}",
new Object[]{abort, fullyQualifiedLogSegment, getLastDLSN(),
- outstandingTransmits.get(),
getWritesPendingTransmit()});
+ outstandingTransmitsUpdater.get(this),
getWritesPendingTransmit()});
// Save the current packet to reset, leave a new empty packet to avoid
a race with
// addCompleteDeferredProcessing.
@@ -629,10 +645,10 @@ class BKLogSegmentWriter implements LogSegmentWriter,
AddCallback, Runnable, Siz
}
private void closeLedgerOnClose(final boolean abort,
- final AtomicReference<Throwable> throwExc,
+ final MutableObject<Throwable> throwExc,
final CompletableFuture<Void>
closePromise) {
// close the log segment if it isn't in error state, so all the
outstanding addEntry(s) will callback.
- if (null == throwExc.get() && !isLogSegmentInError()) {
+ if (null == throwExc.getValue() && !isLogSegmentInError()) {
// Synchronous closing the ledger handle, if we couldn't close a
ledger handle successfully.
// we should throw the exception to #closeToFinalize, so it would
fail completing a log segment.
entryWriter.asyncClose(new CloseCallback() {
@@ -640,7 +656,7 @@ class BKLogSegmentWriter implements LogSegmentWriter,
AddCallback, Runnable, Siz
public void closeComplete(int rc, LedgerHandle lh, Object ctx)
{
if (BKException.Code.OK != rc &&
BKException.Code.LedgerClosedException != rc) {
if (!abort) {
- throwExc.set(new IOException("Failed to close
ledger for "
+ throwExc.setValue(new IOException("Failed to close
ledger for "
+ fullyQualifiedLogSegment + " : " +
BKException.getMessage(rc)));
}
}
@@ -653,17 +669,17 @@ class BKLogSegmentWriter implements LogSegmentWriter,
AddCallback, Runnable, Siz
}
private void completeClosePromise(final boolean abort,
- final AtomicReference<Throwable>
throwExc,
+ final MutableObject<Throwable> throwExc,
final CompletableFuture<Void>
closePromise) {
// If add entry failed because of closing ledger above, we don't need
to fail the close operation
- if (!abort && null == throwExc.get() &&
shouldFailCompleteLogSegment()) {
- throwExc.set(new BKTransmitException("Closing an errored stream :
", transmitResult.get()));
+ if (!abort && null == throwExc.getValue() &&
shouldFailCompleteLogSegment()) {
+ throwExc.setValue(new BKTransmitException("Closing an errored
stream : ", transmitResultUpdater.get(this)));
}
- if (null == throwExc.get()) {
+ if (null == throwExc.getValue()) {
FutureUtils.complete(closePromise, null);
} else {
- FutureUtils.completeExceptionally(closePromise, throwExc.get());
+ FutureUtils.completeExceptionally(closePromise,
throwExc.getValue());
}
}
@@ -721,9 +737,9 @@ class BKLogSegmentWriter implements LogSegmentWriter,
AddCallback, Runnable, Siz
BKException.getMessage(BKException.Code.LedgerClosedException));
}
- if (BKException.Code.OK != transmitResult.get()) {
+ if (BKException.Code.OK != transmitResultUpdater.get(this)) {
// Failfast if the stream already encountered error with safe
retry on the client
- throw new WriteException(fullyQualifiedLogSegment,
BKException.getMessage(transmitResult.get()));
+ throw new WriteException(fullyQualifiedLogSegment,
BKException.getMessage(transmitResultUpdater.get(this)));
}
if (streamEnded) {
@@ -777,12 +793,12 @@ class BKLogSegmentWriter implements LogSegmentWriter,
AddCallback, Runnable, Siz
}
boolean isLogSegmentInError() {
- return (transmitResult.get() != BKException.Code.OK);
+ return (transmitResultUpdater.get(this) != BKException.Code.OK);
}
boolean shouldFailCompleteLogSegment() {
- return (transmitResult.get() != BKException.Code.OK)
- && (transmitResult.get() !=
BKException.Code.LedgerClosedException);
+ return (transmitResultUpdater.get(this) != BKException.Code.OK)
+ && (transmitResultUpdater.get(this) !=
BKException.Code.LedgerClosedException);
}
public synchronized CompletableFuture<DLSN> writeInternal(LogRecord record)
@@ -977,23 +993,24 @@ class BKLogSegmentWriter implements LogSegmentWriter,
AddCallback, Runnable, Siz
}
}
- void scheduleFlushWithDelayIfNeeded(final Callable<?> callable,
- final
AtomicReference<ScheduledFuture<?>> scheduledFutureRef) {
+ void scheduleFlushWithDelayIfNeeded(
+ final Callable<?> callable,
+ final AtomicReferenceFieldUpdater<BKLogSegmentWriter,
ScheduledFuture> scheduledFutureRefUpdater) {
final long delayMs = Math.max(0, minDelayBetweenImmediateFlushMs -
lastTransmit.elapsed(TimeUnit.MILLISECONDS));
- final ScheduledFuture<?> scheduledFuture = scheduledFutureRef.get();
+ final ScheduledFuture scheduledFuture =
scheduledFutureRefUpdater.get(this);
if ((null == scheduledFuture) || scheduledFuture.isDone()) {
- scheduledFutureRef.set(scheduler.schedule(new Runnable() {
+ scheduledFutureRefUpdater.set(this, scheduler.schedule(new
Runnable() {
@Override
public void run() {
synchronized (this) {
- scheduledFutureRef.set(null);
+ scheduledFutureRefUpdater.set(BKLogSegmentWriter.this,
null);
try {
callable.call();
// Flush was successful or wasn't needed, the
exception should be unset.
- scheduledFlushException.set(null);
+
scheduledFlushExceptionUpdater.set(BKLogSegmentWriter.this, null);
} catch (Exception exc) {
- scheduledFlushException.set(exc);
+
scheduledFlushExceptionUpdater.set(BKLogSegmentWriter.this, exc);
LOG.error("Delayed flush failed", exc);
}
}
@@ -1017,15 +1034,16 @@ class BKLogSegmentWriter implements LogSegmentWriter,
AddCallback, Runnable, Siz
checkStateAndTransmit();
return null;
}
- }, transmitSchedFutureRef);
+ }, transmitSchedFutureRefUpdater);
// Timing here is not very important--the last flush failed
and we should
// indicate this to the caller. The next flush may succeed and
unset the
// scheduledFlushException in which case the next write will
succeed (if the caller
// hasn't already closed the writer).
- if (scheduledFlushException.get() != null) {
+ Exception exec = scheduledFlushExceptionUpdater.get(this);
+ if (exec != null) {
throw new FlushException("Last flush encountered an error
while writing data to the backend",
- getLastTxId(), getLastTxIdAcknowledged(),
scheduledFlushException.get());
+ getLastTxId(), getLastTxIdAcknowledged(), exec);
}
}
}
@@ -1069,14 +1087,14 @@ class BKLogSegmentWriter implements LogSegmentWriter,
AddCallback, Runnable, Siz
checkWriteLock();
// If transmitResult is anything other than
BKException.Code.OK, it means that the
// stream has encountered an error and cannot be written to.
- if (!transmitResult.compareAndSet(BKException.Code.OK,
+ if (!transmitResultUpdater.compareAndSet(this,
BKException.Code.OK,
BKException.Code.OK)) {
LOG.error("Log Segment {} Trying to write to an errored
stream; Error is {}",
fullyQualifiedLogSegment,
- BKException.getMessage(transmitResult.get()));
+
BKException.getMessage(transmitResultUpdater.get(this)));
throw new BKTransmitException("Trying to write to an
errored stream;"
- + " Error code : ("
+ transmitResult.get() + ") "
- + BKException.getMessage(transmitResult.get()),
transmitResult.get());
+ + " Error code : ("
+ transmitResultUpdater.get(this) + ") "
+ +
BKException.getMessage(transmitResultUpdater.get(this)),
transmitResultUpdater.get(this));
}
if (recordSetWriter.getNumRecords() == 0) {
@@ -1107,7 +1125,7 @@ class BKLogSegmentWriter implements LogSegmentWriter,
AddCallback, Runnable, Siz
new Object[] {fullyQualifiedLogSegment}, e);
// If a write fails here, we need to set the transmit result
to an error so that
// no future writes go through and violate ordering guarantees.
- transmitResult.set(BKException.Code.WriteException);
+ transmitResultUpdater.set(this,
BKException.Code.WriteException);
if (e instanceof InvalidEnvelopedEntryException) {
alertStatsLogger.raise("Invalid enveloped entry for
segment {} : ", fullyQualifiedLogSegment, e);
throw (InvalidEnvelopedEntryException) e;
@@ -1131,7 +1149,7 @@ class BKLogSegmentWriter implements LogSegmentWriter,
AddCallback, Runnable, Siz
}
lastTransmit.reset().start();
- outstandingTransmits.incrementAndGet();
+ outstandingTransmitsUpdater.incrementAndGet(this);
controlFlushNeeded = false;
return packet.getTransmitFuture();
}
@@ -1145,7 +1163,7 @@ class BKLogSegmentWriter implements LogSegmentWriter,
AddCallback, Runnable, Siz
* flush task can determine if there is anything it needs to do.
*/
private synchronized boolean haveDataToTransmit() {
- if (!transmitResult.compareAndSet(BKException.Code.OK,
BKException.Code.OK)) {
+ if (!transmitResultUpdater.compareAndSet(this, BKException.Code.OK,
BKException.Code.OK)) {
// Even if there is data it cannot be transmitted, so effectively
nothing to send
return false;
}
@@ -1156,14 +1174,15 @@ class BKLogSegmentWriter implements LogSegmentWriter,
AddCallback, Runnable, Siz
@Override
public void addComplete(final int rc, LedgerHandle handle,
final long entryId, final Object ctx) {
- final AtomicReference<Integer> effectiveRC = new
AtomicReference<Integer>(rc);
+ int rcAfterFailPoint = rc;
try {
if
(FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_TransmitComplete))
{
- effectiveRC.set(BKException.Code.UnexpectedConditionException);
+ rcAfterFailPoint =
BKException.Code.UnexpectedConditionException;
}
} catch (Exception exc) {
- effectiveRC.set(BKException.Code.UnexpectedConditionException);
+ rcAfterFailPoint = BKException.Code.UnexpectedConditionException;
}
+ final int effectiveRC = rcAfterFailPoint;
// Sanity check to make sure we're receiving these callbacks in order.
if (entryId > -1 && lastEntryId >= entryId) {
@@ -1199,7 +1218,7 @@ class BKLogSegmentWriter implements LogSegmentWriter,
AddCallback, Runnable, Siz
addCompleteQueuedTime.registerSuccessfulEvent(
queuedTime.elapsed(TimeUnit.MICROSECONDS),
TimeUnit.MICROSECONDS);
- addCompleteDeferredProcessing(transmitPacket, entryId,
effectiveRC.get());
+ addCompleteDeferredProcessing(transmitPacket, entryId,
effectiveRC);
addCompleteDeferredTime.registerSuccessfulEvent(
deferredTime.elapsed(TimeUnit.MICROSECONDS),
TimeUnit.MILLISECONDS);
@@ -1223,14 +1242,14 @@ class BKLogSegmentWriter implements LogSegmentWriter,
AddCallback, Runnable, Siz
}
});
// Race condition if we notify before the addComplete is enqueued.
- transmitPacket.notifyTransmitComplete(effectiveRC.get());
- outstandingTransmits.getAndDecrement();
+ transmitPacket.notifyTransmitComplete(effectiveRC);
+ outstandingTransmitsUpdater.getAndDecrement(this);
} else {
// Notify transmit complete must be called before deferred
processing in the
// sync case since otherwise callbacks in deferred processing may
deadlock.
- transmitPacket.notifyTransmitComplete(effectiveRC.get());
- outstandingTransmits.getAndDecrement();
- addCompleteDeferredProcessing(transmitPacket, entryId,
effectiveRC.get());
+ transmitPacket.notifyTransmitComplete(effectiveRC);
+ outstandingTransmitsUpdater.getAndDecrement(this);
+ addCompleteDeferredProcessing(transmitPacket, entryId,
effectiveRC);
}
}
@@ -1240,17 +1259,17 @@ class BKLogSegmentWriter implements LogSegmentWriter,
AddCallback, Runnable, Siz
boolean cancelPendingPromises = false;
EntryBuffer recordSet = transmitPacket.getRecordSet();
synchronized (this) {
- if (transmitResult.compareAndSet(BKException.Code.OK, rc)) {
+ if (transmitResultUpdater.compareAndSet(this, BKException.Code.OK,
rc)) {
// If this is the first time we are setting an error code in
the transmitResult then
// we must cancel pending promises; once this error has been
set, more records will not
// be enqueued; they will be failed with WriteException
cancelPendingPromises = (BKException.Code.OK != rc);
} else {
LOG.warn("Log segment {} entryId {}: Tried to set transmit
result to ({}) but is already ({})",
- new Object[] {fullyQualifiedLogSegment, entryId, rc,
transmitResult.get()});
+ new Object[] {fullyQualifiedLogSegment, entryId, rc,
transmitResultUpdater.get(this)});
}
- if (transmitResult.get() != BKException.Code.OK) {
+ if (transmitResultUpdater.get(this) != BKException.Code.OK) {
if (recordSet.hasUserRecords()) {
transmitDataPacketSize.registerFailedEvent(
recordSet.getNumBytes(), TimeUnit.MICROSECONDS);
@@ -1273,14 +1292,14 @@ class BKLogSegmentWriter implements LogSegmentWriter,
AddCallback, Runnable, Siz
backgroundFlush(true);
return null;
}
- }, immFlushSchedFutureRef);
+ }, immFlushSchedFutureRefUpdater);
}
}
}
}
// update last dlsn before satisifying future
- if (BKException.Code.OK == transmitResult.get()) {
+ if (BKException.Code.OK == transmitResultUpdater.get(this)) {
DLSN lastDLSNInPacket = recordSet.finalizeTransmit(
logSegmentSequenceNumber, entryId);
if (recordSet.hasUserRecords()) {
@@ -1291,10 +1310,10 @@ class BKLogSegmentWriter implements LogSegmentWriter,
AddCallback, Runnable, Siz
}
}
- if (BKException.Code.OK == transmitResult.get()) {
+ if (BKException.Code.OK == transmitResultUpdater.get(this)) {
recordSet.completeTransmit(logSegmentSequenceNumber, entryId);
} else {
-
recordSet.abortTransmit(Utils.transmitException(transmitResult.get()));
+
recordSet.abortTransmit(Utils.transmitException(transmitResultUpdater.get(this)));
}
if (cancelPendingPromises) {
@@ -1308,7 +1327,7 @@ class BKLogSegmentWriter implements LogSegmentWriter,
AddCallback, Runnable, Siz
}
packetCurrentSaved.getRecordSet().abortTransmit(
new WriteCancelledException(streamName,
- Utils.transmitException(transmitResult.get())));
+
Utils.transmitException(transmitResultUpdater.get(this))));
}
}
diff --git
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
index d62c8e7..898c113 100644
---
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
+++
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
@@ -30,9 +30,9 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
@@ -220,12 +220,12 @@ public class BKLogSegmentEntryReader implements
SafeRunnable, LogSegmentEntryRea
*/
boolean checkReturnCodeAndHandleFailure(int rc, boolean isLongPoll) {
if (BKException.Code.OK == rc) {
- numReadErrors.set(0);
+ numReadErrorsUpdater.set(BKLogSegmentEntryReader.this, 0);
return true;
}
if (BKException.Code.BookieHandleNotAvailableException == rc
|| (isLongPoll &&
BKException.Code.NoSuchLedgerExistsException == rc)) {
- int numErrors = Math.max(1, numReadErrors.incrementAndGet());
+ int numErrors = Math.max(1,
numReadErrorsUpdater.incrementAndGet(BKLogSegmentEntryReader.this));
int nextReadBackoffTime = Math.min(numErrors *
readAheadWaitTime, maxReadBackoffTime);
scheduler.scheduleOrdered(
getSegment().getLogSegmentId(),
@@ -301,15 +301,21 @@ public class BKLogSegmentEntryReader implements
SafeRunnable, LogSegmentEntryRea
private final List<LedgerHandle> openLedgerHandles;
private CacheEntry outstandingLongPoll;
private long nextEntryId;
- private final AtomicReference<Throwable> lastException = new
AtomicReference<Throwable>(null);
- private final AtomicLong scheduleCount = new AtomicLong(0);
+ private static final AtomicReferenceFieldUpdater<BKLogSegmentEntryReader,
Throwable> lastExceptionUpdater =
+ AtomicReferenceFieldUpdater.newUpdater(BKLogSegmentEntryReader.class,
Throwable.class, "lastException");
+ private volatile Throwable lastException = null;
+ private static final AtomicLongFieldUpdater<BKLogSegmentEntryReader>
scheduleCountUpdater =
+ AtomicLongFieldUpdater.newUpdater(BKLogSegmentEntryReader.class,
"scheduleCount");
+ private volatile long scheduleCount = 0L;
private volatile boolean hasCaughtupOnInprogress = false;
private final CopyOnWriteArraySet<StateChangeListener>
stateChangeListeners =
new CopyOnWriteArraySet<StateChangeListener>();
// read retries
private int readAheadWaitTime;
private final int maxReadBackoffTime;
- private final AtomicInteger numReadErrors = new AtomicInteger(0);
+ private static final AtomicIntegerFieldUpdater<BKLogSegmentEntryReader>
numReadErrorsUpdater =
+ AtomicIntegerFieldUpdater.newUpdater(BKLogSegmentEntryReader.class,
"numReadErrors");
+ private volatile int numReadErrors = 0;
private final boolean skipBrokenEntries;
// readahead cache
int cachedEntries = 0;
@@ -493,8 +499,9 @@ public class BKLogSegmentEntryReader implements
SafeRunnable, LogSegmentEntryRea
//
private boolean checkClosedOrInError() {
- if (null != lastException.get()) {
- cancelAllPendingReads(lastException.get());
+ Throwable cause = lastExceptionUpdater.get(this);
+ if (null != cause) {
+ cancelAllPendingReads(cause);
return true;
}
return false;
@@ -507,7 +514,7 @@ public class BKLogSegmentEntryReader implements
SafeRunnable, LogSegmentEntryRea
* @param isBackground is the reader set exception by background reads or
foreground reads
*/
private void completeExceptionally(Throwable throwable, boolean
isBackground) {
- lastException.compareAndSet(null, throwable);
+ lastExceptionUpdater.compareAndSet(this, null, throwable);
if (isBackground) {
notifyReaders();
}
@@ -662,7 +669,7 @@ public class BKLogSegmentEntryReader implements
SafeRunnable, LogSegmentEntryRea
final PendingReadRequest readRequest = new
PendingReadRequest(numEntries);
if (checkClosedOrInError()) {
- readRequest.completeExceptionally(lastException.get());
+ readRequest.completeExceptionally(lastExceptionUpdater.get(this));
} else {
boolean wasQueueEmpty;
synchronized (readQueue) {
@@ -682,7 +689,7 @@ public class BKLogSegmentEntryReader implements
SafeRunnable, LogSegmentEntryRea
return;
}
- long prevCount = scheduleCount.getAndIncrement();
+ long prevCount = scheduleCountUpdater.getAndIncrement(this);
if (0 == prevCount) {
scheduler.submitOrdered(getSegment().getLogSegmentId(), this);
}
@@ -693,7 +700,7 @@ public class BKLogSegmentEntryReader implements
SafeRunnable, LogSegmentEntryRea
*/
@Override
public void safeRun() {
- long scheduleCountLocal = scheduleCount.get();
+ long scheduleCountLocal = scheduleCountUpdater.get(this);
while (true) {
PendingReadRequest nextRequest = null;
synchronized (readQueue) {
@@ -702,14 +709,14 @@ public class BKLogSegmentEntryReader implements
SafeRunnable, LogSegmentEntryRea
// if read queue is empty, nothing to read, return
if (null == nextRequest) {
- scheduleCount.set(0L);
+ scheduleCountUpdater.set(this, 0L);
return;
}
// if the oldest pending promise is interrupted then we must
// mark the reader in error and abort all pending reads since
// we don't know the last consumed read
- if (null == lastException.get()) {
+ if (null == lastExceptionUpdater.get(this)) {
if (nextRequest.getPromise().isCancelled()) {
completeExceptionally(new
DLInterruptedException("Interrupted on reading log segment "
+ getSegment() + " : " +
nextRequest.getPromise().isCancelled()), false);
@@ -745,7 +752,7 @@ public class BKLogSegmentEntryReader implements
SafeRunnable, LogSegmentEntryRea
if (0 == scheduleCountLocal) {
return;
}
- scheduleCountLocal = scheduleCount.decrementAndGet();
+ scheduleCountLocal =
scheduleCountUpdater.decrementAndGet(this);
}
}
}
diff --git
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionStateStore.java
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionStateStore.java
index c680000..5a56517 100644
---
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionStateStore.java
+++
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionStateStore.java
@@ -20,7 +20,7 @@ package org.apache.distributedlog.impl.subscription;
import com.google.common.base.Charsets;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.ZooKeeperClient;
@@ -42,7 +42,9 @@ public class ZKSubscriptionStateStore implements
SubscriptionStateStore {
private final ZooKeeperClient zooKeeperClient;
private final String zkPath;
- private AtomicReference<DLSN> lastCommittedPosition = new
AtomicReference<DLSN>(null);
+ private static final AtomicReferenceFieldUpdater<ZKSubscriptionStateStore,
DLSN> lastCommittedPositionUpdater =
+ AtomicReferenceFieldUpdater.newUpdater(ZKSubscriptionStateStore.class,
DLSN.class, "lastCommittedPosition");
+ private volatile DLSN lastCommittedPosition = null;
public ZKSubscriptionStateStore(ZooKeeperClient zooKeeperClient, String
zkPath) {
this.zooKeeperClient = zooKeeperClient;
@@ -58,8 +60,9 @@ public class ZKSubscriptionStateStore implements
SubscriptionStateStore {
*/
@Override
public CompletableFuture<DLSN> getLastCommitPosition() {
- if (null != lastCommittedPosition.get()) {
- return FutureUtils.value(lastCommittedPosition.get());
+ DLSN dlsn = lastCommittedPositionUpdater.get(this);
+ if (null != dlsn) {
+ return FutureUtils.value(dlsn);
} else {
return getLastCommitPositionFromZK();
}
@@ -105,9 +108,10 @@ public class ZKSubscriptionStateStore implements
SubscriptionStateStore {
*/
@Override
public CompletableFuture<Void> advanceCommitPosition(DLSN newPosition) {
- if (null == lastCommittedPosition.get()
- || (newPosition.compareTo(lastCommittedPosition.get()) > 0)) {
- lastCommittedPosition.set(newPosition);
+ DLSN dlsn = lastCommittedPositionUpdater.get(this);
+ if (null == dlsn
+ || (newPosition.compareTo(dlsn) > 0)) {
+ lastCommittedPositionUpdater.set(this, newPosition);
return
Utils.zkAsyncCreateFullPathOptimisticAndSetData(zooKeeperClient,
zkPath, newPosition.serialize().getBytes(Charsets.UTF_8),
zooKeeperClient.getDefaultACL(),
diff --git
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java
index 81d721b..7948084 100644
---
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java
+++
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java
@@ -25,6 +25,7 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.OrderedScheduler;
@@ -92,7 +93,9 @@ public class ZKDistributedLock implements LockListener,
DistributedLock {
private CompletableFuture<Void> closeFuture = null;
// A counter to track how many re-acquires happened during a lock's life
cycle.
- private final AtomicInteger reacquireCount = new AtomicInteger(0);
+ private static final AtomicIntegerFieldUpdater<ZKDistributedLock>
reacquireCountUpdater =
+ AtomicIntegerFieldUpdater.newUpdater(ZKDistributedLock.class,
"reacquireCount");
+ private volatile int reacquireCount = 0;
private final StatsLogger lockStatsLogger;
private final OpStatsLogger acquireStats;
private final OpStatsLogger reacquireStats;
@@ -323,7 +326,7 @@ public class ZKDistributedLock implements LockListener,
DistributedLock {
@VisibleForTesting
int getReacquireCount() {
- return reacquireCount.get();
+ return reacquireCountUpdater.get(this);
}
@VisibleForTesting
@@ -511,7 +514,7 @@ public class ZKDistributedLock implements LockListener,
DistributedLock {
}
});
}
- reacquireCount.incrementAndGet();
+ reacquireCountUpdater.incrementAndGet(this);
internalReacquireLock(new AtomicInteger(Integer.MAX_VALUE), 0,
lockPromise);
return lockPromise;
}
diff --git
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java
index f018e36..3d79336 100644
---
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java
+++
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java
@@ -31,7 +31,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Function;
import org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
@@ -274,7 +274,9 @@ class ZKSessionLock implements SessionLock {
private String currentNode;
private String watchedNode;
private LockWatcher watcher;
- private final AtomicInteger epoch = new AtomicInteger(0);
+ private static final AtomicIntegerFieldUpdater<ZKSessionLock> epochUpdater
=
+ AtomicIntegerFieldUpdater.newUpdater(ZKSessionLock.class, "epoch");
+ private volatile int epoch = 0;
private final OrderedScheduler lockStateExecutor;
private LockListener lockListener = null;
private final long lockOpTimeout;
@@ -358,9 +360,8 @@ class ZKSessionLock implements SessionLock {
return this.lockPath;
}
- @VisibleForTesting
- AtomicInteger getEpoch() {
- return epoch;
+ int getEpoch() {
+ return epochUpdater.get(this);
}
@VisibleForTesting
@@ -394,7 +395,7 @@ class ZKSessionLock implements SessionLock {
lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() {
@Override
public void safeRun() {
- if (ZKSessionLock.this.epoch.get() == lockEpoch) {
+ if (getEpoch() == lockEpoch) {
if (LOG.isTraceEnabled()) {
LOG.trace("{} executing lock action '{}' under epoch
{} for lock {}",
new Object[]{lockId, func.getActionName(),
lockEpoch, lockPath});
@@ -409,7 +410,7 @@ class ZKSessionLock implements SessionLock {
LOG.trace("{} skipped executing lock action '{}' for
lock {},"
+ " since epoch is changed from {} to
{}.",
new Object[]{lockId, func.getActionName(),
- lockPath, lockEpoch,
ZKSessionLock.this.epoch.get()});
+ lockPath, lockEpoch, getEpoch()});
}
}
}
@@ -433,7 +434,7 @@ class ZKSessionLock implements SessionLock {
lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() {
@Override
public void safeRun() {
- int currentEpoch = ZKSessionLock.this.epoch.get();
+ int currentEpoch = getEpoch();
if (currentEpoch == lockEpoch) {
if (LOG.isTraceEnabled()) {
LOG.trace("{} executed lock action '{}' under epoch {}
for lock {}",
@@ -656,7 +657,7 @@ class ZKSessionLock implements SessionLock {
return false;
}
// current owner is itself
- final int curEpoch = epoch.incrementAndGet();
+ final int curEpoch = epochUpdater.incrementAndGet(this);
executeLockAction(curEpoch, new LockAction() {
@Override
public void execute() {
@@ -736,7 +737,7 @@ class ZKSessionLock implements SessionLock {
* promise to satisfy with current lock owner.
*/
private void asyncTryLockWithoutCleanup(final boolean wait, final
CompletableFuture<String> promise) {
- executeLockAction(epoch.get(), new LockAction() {
+ executeLockAction(getEpoch(), new LockAction() {
@Override
public void execute() {
if (!lockState.inState(State.INIT)) {
@@ -746,7 +747,7 @@ class ZKSessionLock implements SessionLock {
}
lockState.transition(State.PREPARING);
- final int curEpoch = epoch.incrementAndGet();
+ final int curEpoch =
epochUpdater.incrementAndGet(ZKSessionLock.this);
watcher = new LockWatcher(curEpoch);
// register watcher for session expires
zkClient.register(watcher);
@@ -913,7 +914,7 @@ class ZKSessionLock implements SessionLock {
if (LOG.isDebugEnabled()) {
LOG.debug("Notify lock waiters on {} at {} : watcher epoch {},
lock epoch {}",
new Object[] { lockPath, System.currentTimeMillis(),
- lockEpoch, ZKSessionLock.this.epoch.get() });
+ lockEpoch, getEpoch() });
}
acquireFuture.complete(true);
}
@@ -924,7 +925,7 @@ class ZKSessionLock implements SessionLock {
private void unlockInternal(final CompletableFuture<Void> promise) {
// already closed or expired, nothing to cleanup
- this.epoch.incrementAndGet();
+ this.epochUpdater.incrementAndGet(this);
if (null != watcher) {
this.zkClient.unregister(watcher);
}
@@ -1026,7 +1027,7 @@ class ZKSessionLock implements SessionLock {
}
// increment epoch to avoid any ongoing locking action
- ZKSessionLock.this.epoch.incrementAndGet();
+ epochUpdater.incrementAndGet(ZKSessionLock.this);
// if session expired, just notify the waiter. as the lock
acquire doesn't succeed.
// we don't even need to clean up the lock as the znode will
disappear after session expired
@@ -1326,7 +1327,7 @@ class ZKSessionLock implements SessionLock {
@Override
public void process(WatchedEvent event) {
LOG.debug("Received event {} from lock {} at {} : watcher epoch
{}, lock epoch {}.",
- new Object[] {event, lockPath, System.currentTimeMillis(),
epoch, ZKSessionLock.this.epoch.get() });
+ new Object[] {event, lockPath, System.currentTimeMillis(),
epoch, getEpoch() });
if (event.getType() == Watcher.Event.EventType.None) {
switch (event.getState()) {
case SyncConnected:
@@ -1334,7 +1335,7 @@ class ZKSessionLock implements SessionLock {
case Expired:
LOG.info("Session {} is expired for lock {} at {} :
watcher epoch {}, lock epoch {}.",
new Object[] { lockId.getRight(), lockPath,
System.currentTimeMillis(),
- epoch, ZKSessionLock.this.epoch.get()
});
+ epoch, getEpoch() });
handleSessionExpired(epoch);
break;
default:
diff --git
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java
index b5bc499..5a076e3 100644
---
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java
+++
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java
@@ -18,7 +18,7 @@
package org.apache.distributedlog.util;
import com.google.common.annotations.VisibleForTesting;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.Gauge;
@@ -41,7 +41,9 @@ public class SimplePermitLimiter implements PermitLimiter {
final Counter acquireFailureCounter;
final OpStatsLogger permitsMetric;
- final AtomicInteger permits;
+ private static final AtomicIntegerFieldUpdater<SimplePermitLimiter>
permitsUpdater =
+ AtomicIntegerFieldUpdater.newUpdater(SimplePermitLimiter.class,
"permits");
+ volatile int permits = 0;
final int permitsMax;
final boolean darkmode;
final Feature disableWriteLimitFeature;
@@ -51,7 +53,6 @@ public class SimplePermitLimiter implements PermitLimiter {
public SimplePermitLimiter(boolean darkmode, int permitsMax, StatsLogger
statsLogger,
boolean singleton, Feature
disableWriteLimitFeature) {
- this.permits = new AtomicInteger(0);
this.permitsMax = permitsMax;
this.darkmode = darkmode;
this.disableWriteLimitFeature = disableWriteLimitFeature;
@@ -66,7 +67,7 @@ public class SimplePermitLimiter implements PermitLimiter {
}
@Override
public Number getSample() {
- return permits.get();
+ return permitsUpdater.get(SimplePermitLimiter.this);
}
};
this.permitsGaugeLabel = "permits";
@@ -82,19 +83,19 @@ public class SimplePermitLimiter implements PermitLimiter {
@Override
public boolean acquire() {
- permitsMetric.registerSuccessfulValue(permits.get());
- if (permits.incrementAndGet() <= permitsMax || isDarkmode()) {
+ permitsMetric.registerSuccessfulValue(permitsUpdater.get(this));
+ if (permitsUpdater.incrementAndGet(this) <= permitsMax ||
isDarkmode()) {
return true;
} else {
acquireFailureCounter.inc();
- permits.decrementAndGet();
+ permitsUpdater.decrementAndGet(this);
return false;
}
}
@Override
public void release(int permitsToRelease) {
- permits.addAndGet(-permitsToRelease);
+ permitsUpdater.addAndGet(this, -permitsToRelease);
}
@Override
@@ -104,7 +105,7 @@ public class SimplePermitLimiter implements PermitLimiter {
@VisibleForTesting
public int getPermits() {
- return permits.get();
+ return permitsUpdater.get(this);
}
public void unregisterGauge() {
diff --git
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/LimitedPermitManager.java
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/LimitedPermitManager.java
index bd731a6..211d10c 100644
---
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/LimitedPermitManager.java
+++
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/LimitedPermitManager.java
@@ -21,7 +21,7 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
@@ -44,14 +44,14 @@ public class LimitedPermitManager implements PermitManager,
Runnable, Watcher {
ALLOWED, DISALLOWED, DISABLED
}
- class EpochPermit implements Permit {
+ static class EpochPermit implements Permit {
final PermitState state;
final int epoch;
- EpochPermit(PermitState state) {
+ EpochPermit(PermitState state, int epoch) {
this.state = state;
- this.epoch = LimitedPermitManager.this.epoch.get();
+ this.epoch = epoch;
}
int getEpoch() {
@@ -69,7 +69,9 @@ public class LimitedPermitManager implements PermitManager,
Runnable, Watcher {
final int period;
final TimeUnit timeUnit;
final ScheduledExecutorService executorService;
- final AtomicInteger epoch = new AtomicInteger(0);
+ private static final AtomicIntegerFieldUpdater<LimitedPermitManager>
epochUpdater =
+ AtomicIntegerFieldUpdater.newUpdater(LimitedPermitManager.class,
"epoch");
+ volatile int epoch = 0;
private StatsLogger statsLogger = null;
private Gauge<Number> outstandingGauge = null;
@@ -106,13 +108,13 @@ public class LimitedPermitManager implements
PermitManager, Runnable, Watcher {
@Override
public synchronized Permit acquirePermit() {
if (!enablePermits) {
- return new EpochPermit(PermitState.DISABLED);
+ return new EpochPermit(PermitState.DISABLED,
epochUpdater.get(this));
}
if (null != semaphore) {
- return semaphore.tryAcquire() ? new
EpochPermit(PermitState.ALLOWED) :
- new EpochPermit(PermitState.DISALLOWED);
+ return semaphore.tryAcquire() ? new
EpochPermit(PermitState.ALLOWED, epochUpdater.get(this)) :
+ new EpochPermit(PermitState.DISALLOWED,
epochUpdater.get(this));
} else {
- return new EpochPermit(PermitState.ALLOWED);
+ return new EpochPermit(PermitState.ALLOWED,
epochUpdater.get(this));
}
}
@@ -138,9 +140,10 @@ public class LimitedPermitManager implements
PermitManager, Runnable, Watcher {
if (!(permit instanceof EpochPermit)) {
return false;
}
- if (epoch.getAndIncrement() == ((EpochPermit) permit).getEpoch()) {
+ int epoch = epochUpdater.getAndIncrement(this);
+ if (epoch == ((EpochPermit) permit).getEpoch()) {
this.enablePermits = false;
- LOG.info("EnablePermits = {}, Epoch = {}.", this.enablePermits,
epoch.get());
+ LOG.info("EnablePermits = {}, Epoch = {}.", this.enablePermits,
epoch);
return true;
} else {
return false;
@@ -159,9 +162,9 @@ public class LimitedPermitManager implements PermitManager,
Runnable, Watcher {
}
synchronized void forceSetAllowPermits(boolean allowPermits) {
- epoch.getAndIncrement();
+ int epoch = epochUpdater.getAndIncrement(this);
this.enablePermits = allowPermits;
- LOG.info("EnablePermits = {}, Epoch = {}.", this.enablePermits,
epoch.get());
+ LOG.info("EnablePermits = {}, Epoch = {}.", this.enablePermits, epoch);
}
@Override
diff --git
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/ZKWatcherManager.java
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/ZKWatcherManager.java
index fe8eed4..8c350c9 100644
---
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/ZKWatcherManager.java
+++
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/ZKWatcherManager.java
@@ -21,7 +21,7 @@ import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.distributedlog.ZooKeeperClient;
@@ -89,7 +89,7 @@ public class ZKWatcherManager implements Watcher {
private static final String numChildWatchesGauageLabel =
"num_child_watches";
protected final ConcurrentMap<String, Set<Watcher>> childWatches;
- protected final AtomicInteger allWatchesGauge;
+ protected final LongAdder allWatchesGauge;
private ZKWatcherManager(String name,
ZooKeeperClient zkc,
@@ -100,7 +100,7 @@ public class ZKWatcherManager implements Watcher {
// watches
this.childWatches = new ConcurrentHashMap<String, Set<Watcher>>();
- this.allWatchesGauge = new AtomicInteger(0);
+ this.allWatchesGauge = new LongAdder();
// stats
totalWatchesGauge = new Gauge<Number>() {
@@ -111,7 +111,7 @@ public class ZKWatcherManager implements Watcher {
@Override
public Number getSample() {
- return allWatchesGauge.get();
+ return allWatchesGauge.sum();
}
};
this.statsLogger.registerGauge(totalWatchesGauageLabel,
totalWatchesGauge);
@@ -141,7 +141,7 @@ public class ZKWatcherManager implements Watcher {
synchronized (watchers) {
if (childWatches.get(path) == watchers) {
if (watchers.add(watcher)) {
- allWatchesGauge.incrementAndGet();
+ allWatchesGauge.increment();
}
} else {
logger.warn("Watcher set for path {} has been changed while
registering child watcher {}.",
@@ -160,7 +160,7 @@ public class ZKWatcherManager implements Watcher {
}
synchronized (watchers) {
if (watchers.remove(watcher)) {
- allWatchesGauge.decrementAndGet();
+ allWatchesGauge.decrement();
} else {
logger.warn("Remove a non-registered child watcher {} from
path {}", watcher, path);
}
@@ -212,7 +212,7 @@ public class ZKWatcherManager implements Watcher {
}
private void handleKeeperStateEvent(WatchedEvent event) {
- Set<Watcher> savedAllWatches = new
HashSet<Watcher>(allWatchesGauge.get());
+ Set<Watcher> savedAllWatches = new HashSet<Watcher>((int)
allWatchesGauge.sum());
for (Set<Watcher> watcherSet : childWatches.values()) {
synchronized (watcherSet) {
savedAllWatches.addAll(watcherSet);
diff --git
a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java
b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java
index ad1ebc1..901d2a0 100644
---
a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java
+++
b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java
@@ -221,7 +221,7 @@ public class TestZKSessionLock extends
ZooKeeperClusterTestCase {
// lock action would be executed in same epoch
final CountDownLatch latch1 = new CountDownLatch(1);
- lock.executeLockAction(lock.getEpoch().get(), new LockAction() {
+ lock.executeLockAction(lock.getEpoch(), new LockAction() {
@Override
public void execute() {
counter.incrementAndGet();
@@ -238,7 +238,7 @@ public class TestZKSessionLock extends
ZooKeeperClusterTestCase {
// lock action would not be executed in same epoch
final CountDownLatch latch2 = new CountDownLatch(1);
- lock.executeLockAction(lock.getEpoch().get() + 1, new LockAction() {
+ lock.executeLockAction(lock.getEpoch() + 1, new LockAction() {
@Override
public void execute() {
counter.incrementAndGet();
@@ -249,7 +249,7 @@ public class TestZKSessionLock extends
ZooKeeperClusterTestCase {
return "increment2";
}
});
- lock.executeLockAction(lock.getEpoch().get(), new LockAction() {
+ lock.executeLockAction(lock.getEpoch(), new LockAction() {
@Override
public void execute() {
latch2.countDown();
@@ -265,7 +265,7 @@ public class TestZKSessionLock extends
ZooKeeperClusterTestCase {
// lock action would not be executed in same epoch and promise would
be satisfied with exception
CompletableFuture<Void> promise = new CompletableFuture<Void>();
- lock.executeLockAction(lock.getEpoch().get() + 1, new LockAction() {
+ lock.executeLockAction(lock.getEpoch() + 1, new LockAction() {
@Override
public void execute() {
counter.incrementAndGet();
diff --git
a/stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLOutputStream.java
b/stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLOutputStream.java
index e6d076e..b0b5bb1 100644
---
a/stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLOutputStream.java
+++
b/stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLOutputStream.java
@@ -25,7 +25,7 @@ import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
@@ -52,7 +52,9 @@ class DLOutputStream extends OutputStream {
private long writePos = 0L;
// state
- private final AtomicReference<Throwable> exception = new
AtomicReference<>(null);
+ private static final AtomicReferenceFieldUpdater<DLOutputStream,
Throwable> exceptionUpdater =
+ AtomicReferenceFieldUpdater.newUpdater(DLOutputStream.class,
Throwable.class, "exception");
+ private volatile Throwable exception = null;
DLOutputStream(DistributedLogManager dlm,
AsyncLogWriter writer) {
@@ -83,7 +85,7 @@ class DLOutputStream extends OutputStream {
}
private synchronized void write(ByteBuf buf) throws IOException {
- Throwable cause = exception.get();
+ Throwable cause = exceptionUpdater.get(this);
if (null != cause) {
if (cause instanceof IOException) {
throw (IOException) cause;
@@ -104,7 +106,7 @@ class DLOutputStream extends OutputStream {
@Override
public void onFailure(Throwable cause) {
- exception.compareAndSet(null, cause);
+ exceptionUpdater.compareAndSet(DLOutputStream.this, null,
cause);
}
});
}
--
To stop receiving notification emails like this one, please contact
[email protected].