Simplify the state transition on stream
* the stream is created on INITIALIZING
* when the stream is started, it would start transition from INITIALIZING to
INITIALIZED
* it would serve stream operations when the stream is INITIALIZED
* it would be turned to ERROR when encountered exceptions.
* the stream would be closed when service operation timeout or encountered any
exceptions. it would first be removed from acquired mapping
* the stream would be removed from cached mapping depends on probation time.
RB_ID=848047
Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit:
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/f19e7564
Tree:
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/f19e7564
Diff:
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/f19e7564
Branch: refs/heads/merge/DL-98
Commit: f19e7564ff4a1ec1b5d6f2683db190d739df99bb
Parents: 0a18f56
Author: Leigh Stewart <[email protected]>
Authored: Mon Dec 12 16:49:26 2016 -0800
Committer: Sijie Guo <[email protected]>
Committed: Mon Dec 12 16:49:26 2016 -0800
----------------------------------------------------------------------
.../service/DistributedLogServiceImpl.java | 27 +-
.../service/stream/StreamImpl.java | 550 +++++++------------
.../service/stream/StreamManager.java | 5 +-
.../service/stream/StreamManagerImpl.java | 15 +-
.../service/TestDistributedLogService.java | 20 +-
5 files changed, 222 insertions(+), 395 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f19e7564/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
----------------------------------------------------------------------
diff --git
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
index 751e972..3a9b904 100644
---
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
+++
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
@@ -90,7 +90,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -378,7 +377,7 @@ public class DistributedLogServiceImpl implements
DistributedLogService.ServiceI
// if it is closed, we would not acquire stream again.
return null;
}
- writer = streamManager.getOrCreateStream(stream);
+ writer = streamManager.getOrCreateStream(stream, true);
} finally {
closeLock.readLock().unlock();
}
@@ -631,26 +630,6 @@ public class DistributedLogServiceImpl implements
DistributedLogService.ServiceI
logger.info("Released KeepAlive Latch. Main thread will shut the
service down.");
}
- @VisibleForTesting
- java.util.concurrent.Future<?> schedule(Runnable runnable, long delayMs) {
- closeLock.readLock().lock();
- try {
- if (serverStatus != ServerStatus.WRITE_AND_ACCEPT) {
- return null;
- } else if (delayMs > 0) {
- return scheduler.schedule(runnable, delayMs,
TimeUnit.MILLISECONDS);
- } else {
- return scheduler.submit(runnable);
- }
- } catch (RejectedExecutionException ree) {
- logger.error("Failed to schedule task {} in {} ms : ",
- new Object[] { runnable, delayMs, ree });
- return null;
- } finally {
- closeLock.readLock().unlock();
- }
- }
-
// Test methods.
private DynamicDistributedLogConfiguration getDynConf(String streamName) {
@@ -664,8 +643,8 @@ public class DistributedLogServiceImpl implements
DistributedLogService.ServiceI
}
@VisibleForTesting
- Stream newStream(String name) {
- return streamFactory.create(name, getDynConf(name), streamManager);
+ Stream newStream(String name) throws IOException {
+ return streamManager.getOrCreateStream(name, false);
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f19e7564/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
----------------------------------------------------------------------
diff --git
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
index 1204d39..3d5b9e7 100644
---
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
+++
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
@@ -26,7 +26,6 @@ import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.DistributedLogManager;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
import com.twitter.distributedlog.exceptions.DLException;
-import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
import com.twitter.distributedlog.exceptions.OverCapacityException;
import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException;
import com.twitter.distributedlog.exceptions.StreamNotReadyException;
@@ -70,24 +69,23 @@ import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class StreamImpl implements Stream {
static final Logger logger = LoggerFactory.getLogger(StreamImpl.class);
+ /**
+ * The status of the stream.
+ *
+ * The status change of the stream should just go in one direction. If a
stream hits
+ * any error, the stream should be put in error state. If a stream is in
error state,
+ * it should be removed and not reused anymore.
+ */
public static enum StreamStatus {
UNINITIALIZED(-1),
INITIALIZING(0),
INITIALIZED(1),
- // if a stream is in failed state, it could be retried immediately.
- // a stream will be put in failed state when encountered any stream
exception.
- FAILED(-2),
- // if a stream is in backoff state, it would backoff for a while.
- // a stream will be put in backoff state when failed to acquire the
ownership.
- BACKOFF(-3),
CLOSING(-4),
CLOSED(-5),
// if a stream is in error state, it should be abort during closing.
@@ -112,26 +110,15 @@ public class StreamImpl implements Stream {
private final Partition partition;
private DistributedLogManager manager;
- // A write has been attempted since the last stream acquire.
- private volatile boolean writeSinceLastAcquire = false;
private volatile AsyncLogWriter writer;
private volatile StreamStatus status;
private volatile String owner;
private volatile Throwable lastException;
- private volatile boolean running = true;
- private volatile boolean suspended = false;
private volatile Queue<StreamOp> pendingOps = new ArrayDeque<StreamOp>();
private final Promise<Void> closePromise = new Promise<Void>();
private final Object txnLock = new Object();
private final TimeSequencer sequencer = new TimeSequencer();
- // last acquire time
- private final Stopwatch lastAcquireWatch = Stopwatch.createUnstarted();
- // last acquire failure time
- private final Stopwatch lastAcquireFailureWatch =
Stopwatch.createUnstarted();
- private final long nextAcquireWaitTimeMs;
- private ScheduledFuture<?> tryAcquireScheduledFuture = null;
- private long scheduledAcquireDelayMs = 0L;
private final StreamRequestLimiter limiter;
private final DynamicDistributedLogConfiguration dynConf;
private final DistributedLogConfiguration dlConfig;
@@ -165,7 +152,7 @@ public class StreamImpl implements Stream {
new ConcurrentHashMap<String, Counter>();
// Since we may create and discard streams at initialization if there's a
race,
- // must not do any expensive intialization here (particularly any locking
or
+ // must not do any expensive initialization here (particularly any locking
or
// significant resource allocation etc.).
StreamImpl(final String name,
final Partition partition,
@@ -189,7 +176,6 @@ public class StreamImpl implements Stream {
this.partition = partition;
this.status = StreamStatus.UNINITIALIZED;
this.lastException = new IOException("Fail to write record to stream "
+ name);
- this.nextAcquireWaitTimeMs =
dlConfig.getZKSessionTimeoutMilliseconds() * 3 / 5;
this.streamConfigProvider = streamConfigProvider;
this.dlNamespace = dlNamespace;
this.featureRateLimitDisabled = featureProvider.getFeature(
@@ -275,54 +261,16 @@ public class StreamImpl implements Stream {
return String.format("Stream:%s, %s, %s Status:%s", name, manager,
writer, status);
}
- // schedule stream acquistion
- private void tryAcquireStreamOnce() {
- if (!running) {
- return;
- }
-
- boolean needAcquire = false;
- boolean checkNextTime = false;
- synchronized (this) {
- switch (this.status) {
- case INITIALIZING:
- streamManager.notifyReleased(this);
- needAcquire = true;
- break;
- case FAILED:
- this.status = StreamStatus.INITIALIZING;
- streamManager.notifyReleased(this);
- needAcquire = true;
- break;
- case BACKOFF:
- // We may end up here after timeout on streamLock. To avoid
acquire on every timeout
- // we should only try again if a write has been attempted
since the last acquire
- // attempt. If we end up here because the request handler woke
us up, the flag will
- // be set and we will try to acquire as intended.
- if (writeSinceLastAcquire) {
- this.status = StreamStatus.INITIALIZING;
- streamManager.notifyReleased(this);
- needAcquire = true;
- } else {
- checkNextTime = true;
- }
- break;
- default:
- break;
- }
- }
- if (needAcquire) {
- lastAcquireWatch.reset().start();
- acquireStream().addEventListener(new
FutureEventListener<Boolean>() {
+ @Override
+ public void start() {
+ // acquire the stream
+ acquireStream().addEventListener(new FutureEventListener<Boolean>() {
@Override
public void onSuccess(Boolean success) {
- synchronized (StreamImpl.this) {
- scheduledAcquireDelayMs = 0L;
- tryAcquireScheduledFuture = null;
- }
if (!success) {
- // schedule acquire in nextAcquireWaitTimeMs
- scheduleTryAcquireOnce(nextAcquireWaitTimeMs);
+ // failed to acquire the stream. set the stream in
error status and close it.
+ setStreamInErrorStatus();
+ requestClose("Failed to acquire the ownership");
}
}
@@ -330,65 +278,40 @@ public class StreamImpl implements Stream {
public void onFailure(Throwable cause) {
// unhandled exceptions
logger.error("Stream {} threw unhandled exception : ",
name, cause);
+ // failed to acquire the stream. set the stream in error
status and close it.
setStreamInErrorStatus();
requestClose("Unhandled exception");
}
});
- } else if (StreamStatus.isUnavailable(status)) {
- // if the stream is unavailable, stop the thread and close the
stream
- requestClose("Stream is unavailable anymore");
- } else if (StreamStatus.INITIALIZED != status &&
lastAcquireWatch.elapsed(TimeUnit.HOURS) > 2) {
- // if the stream isn't in initialized state and no writes coming
in, then close the stream
- requestClose("Stream not used anymore");
- } else if (checkNextTime) {
- synchronized (StreamImpl.this) {
- scheduledAcquireDelayMs = 0L;
- tryAcquireScheduledFuture = null;
- }
- // schedule acquire in nextAcquireWaitTimeMs
- scheduleTryAcquireOnce(nextAcquireWaitTimeMs);
- }
}
- private synchronized void scheduleTryAcquireOnce(long delayMs) {
- if (null != tryAcquireScheduledFuture) {
- if (delayMs <= 0) {
- if (scheduledAcquireDelayMs <= 0L ||
- (scheduledAcquireDelayMs > 0L
- && !tryAcquireScheduledFuture.cancel(false))) {
- return;
- }
- // if the scheduled one could be cancelled, re-submit one
- } else {
- return;
+ //
+ // Stats Operations
+ //
+
+ void countException(Throwable t, StatsLogger streamExceptionLogger) {
+ String exceptionName = null == t ? "null" : t.getClass().getName();
+ Counter counter = exceptionCounters.get(exceptionName);
+ if (null == counter) {
+ counter = exceptionStatLogger.getCounter(exceptionName);
+ Counter oldCounter = exceptionCounters.putIfAbsent(exceptionName,
counter);
+ if (null != oldCounter) {
+ counter = oldCounter;
}
}
- tryAcquireScheduledFuture = schedule(new Runnable() {
- @Override
- public void run() {
- tryAcquireStreamOnce();
- }
- }, delayMs);
- scheduledAcquireDelayMs = delayMs;
+ counter.inc();
+ streamExceptionLogger.getCounter(exceptionName).inc();
}
- @Override
- public void start() {
- scheduleTryAcquireOnce(0);
+ boolean isCriticalException(Throwable cause) {
+ return !(cause instanceof OwnershipAcquireFailedException);
}
- ScheduledFuture<?> schedule(Runnable runnable, long delayMs) {
- if (!running) {
- return null;
- }
- try {
- return scheduler.schedule(name, runnable, delayMs,
TimeUnit.MILLISECONDS);
- } catch (RejectedExecutionException ree) {
- logger.error("Failed to schedule task {} in {} ms : ",
- new Object[] { runnable, delayMs, ree });
- return null;
- }
- }
+ //
+ // Service Timeout:
+ // - schedule a timeout function to handle operation timeouts: {@link
#handleServiceTimeout(String)}
+ // - if the operation is completed within timeout period, cancel the
timeout.
+ //
void scheduleTimeout(final StreamOp op) {
final Timeout timeout = requestTimer.newTimeout(new TimerTask() {
@@ -418,12 +341,14 @@ public class StreamImpl implements Stream {
* stream off the proxy for a period of time, hopefully long enough for the
* issues to be resolved, or for whoop to kick in and kill the shard.
*/
- synchronized void handleServiceTimeout(String reason) {
- if (StreamStatus.isUnavailable(status)) {
- return;
+ void handleServiceTimeout(String reason) {
+ synchronized (this) {
+ if (StreamStatus.isUnavailable(status)) {
+ return;
+ }
+ // Mark stream in error state
+ setStreamInErrorStatus();
}
- // Mark stream in error state
- setStreamInErrorStatus();
// Async close request, and schedule eviction when its done.
Future<Void> closeFuture = requestClose(reason, false /* dont remove
*/);
@@ -436,6 +361,10 @@ public class StreamImpl implements Stream {
});
}
+ //
+ // Submit the operation to the stream.
+ //
+
/**
* Execute the StreamOp. If reacquire is needed, this may initiate
reacquire and queue the op for
* execution once complete.
@@ -445,9 +374,6 @@ public class StreamImpl implements Stream {
*/
@Override
public void submit(StreamOp op) {
- // Let stream acquire thread know a write has been attempted.
- writeSinceLastAcquire = true;
-
try {
limiter.apply(op);
} catch (OverCapacityException ex) {
@@ -460,36 +386,28 @@ public class StreamImpl implements Stream {
scheduleTimeout(op);
}
- boolean notifyAcquireThread = false;
boolean completeOpNow = false;
boolean success = true;
if (StreamStatus.isUnavailable(status)) {
// Stream is closed, fail the op immediately
op.fail(new StreamUnavailableException("Stream " + name + " is
closed."));
return;
- } if (StreamStatus.INITIALIZED == status && writer != null) {
+ } else if (StreamStatus.INITIALIZED == status && writer != null) {
completeOpNow = true;
success = true;
} else {
synchronized (this) {
if (StreamStatus.isUnavailable(status)) {
- // complete the write op as {@link #executeOp(op,
success)} will handle closed case.
- completeOpNow = true;
- success = true;
+ // Stream is closed, fail the op immediately
+ op.fail(new StreamUnavailableException("Stream " + name +
" is closed."));
+ return;
} if (StreamStatus.INITIALIZED == status) {
completeOpNow = true;
success = true;
- } else if (StreamStatus.BACKOFF == status &&
- lastAcquireFailureWatch.elapsed(TimeUnit.MILLISECONDS)
< nextAcquireWaitTimeMs) {
- completeOpNow = true;
- success = false;
} else if (failFastOnStreamNotReady) {
- notifyAcquireThread = true;
- completeOpNow = false;
- success = false;
op.fail(new StreamNotReadyException("Stream " + name + "
is not ready; status = " + status));
- } else { // closing & initializing
- notifyAcquireThread = true;
+ return;
+ } else { // the stream is still initializing
pendingOps.add(op);
pendingOpsCounter.inc();
if (1 == pendingOps.size()) {
@@ -500,14 +418,15 @@ public class StreamImpl implements Stream {
}
}
}
- if (notifyAcquireThread && !suspended) {
- scheduleTryAcquireOnce(0L);
- }
if (completeOpNow) {
executeOp(op, success);
}
}
+ //
+ // Execute operations and handle exceptions on operations
+ //
+
/**
* Execute the <i>op</i> immediately.
*
@@ -516,20 +435,7 @@ public class StreamImpl implements Stream {
* @param success
* whether the operation is success or not.
*/
- void executeOp(StreamOp op, boolean success) {
- closeLock.readLock().lock();
- try {
- if (StreamStatus.isUnavailable(status)) {
- op.fail(new StreamUnavailableException("Stream " + name + " is
closed."));
- return;
- }
- doExecuteOp(op, success);
- } finally {
- closeLock.readLock().unlock();
- }
- }
-
- private void doExecuteOp(final StreamOp op, boolean success) {
+ void executeOp(final StreamOp op, boolean success) {
final AsyncLogWriter writer;
final Throwable lastException;
synchronized (this) {
@@ -552,7 +458,7 @@ public class StreamImpl implements Stream {
case FOUND:
assert(cause instanceof
OwnershipAcquireFailedException);
countAsException = false;
- handleOwnershipAcquireFailedException(op,
(OwnershipAcquireFailedException) cause);
+ handleExceptionOnStreamOp(op, cause);
break;
case ALREADY_CLOSED:
assert(cause instanceof AlreadyClosedException);
@@ -573,13 +479,14 @@ public class StreamImpl implements Stream {
case OVER_CAPACITY:
op.fail(cause);
break;
- // exceptions that *could* / *might* be recovered by
creating a new writer
+ // the DL writer hits exception, simple set the stream
to error status
+ // and fail the request
default:
- handleRecoverableDLException(op, cause);
+ handleExceptionOnStreamOp(op, cause);
break;
}
} else {
- handleUnknownException(op, cause);
+ handleExceptionOnStreamOp(op, cause);
}
if (countAsException) {
countException(cause, streamExceptionStatLogger);
@@ -587,88 +494,41 @@ public class StreamImpl implements Stream {
}
});
} else {
- op.fail(lastException);
- }
- }
-
- /**
- * Handle recoverable dl exception.
- *
- * @param op
- * stream operation executing
- * @param cause
- * exception received when executing <i>op</i>
- */
- private void handleRecoverableDLException(StreamOp op, final Throwable
cause) {
- AsyncLogWriter oldWriter = null;
- boolean statusChanged = false;
- synchronized (this) {
- if (StreamStatus.INITIALIZED == status) {
- oldWriter = setStreamStatus(StreamStatus.FAILED,
StreamStatus.INITIALIZED,
- null, null, cause);
- statusChanged = true;
+ if (null != lastException) {
+ op.fail(lastException);
+ } else {
+ op.fail(new StreamUnavailableException("Stream " + name + " is
closed."));
}
}
- if (statusChanged) {
- Abortables.asyncAbort(oldWriter, false);
- logger.error("Failed to write data into stream {} : ", name,
cause);
- scheduleTryAcquireOnce(0L);
- }
- op.fail(cause);
}
/**
- * Handle unknown exception when executing <i>op</i>.
+ * Handle exception when executing <i>op</i>.
*
* @param op
* stream operation executing
* @param cause
* exception received when executing <i>op</i>
*/
- private void handleUnknownException(StreamOp op, final Throwable cause) {
+ private void handleExceptionOnStreamOp(StreamOp op, final Throwable cause)
{
AsyncLogWriter oldWriter = null;
boolean statusChanged = false;
synchronized (this) {
if (StreamStatus.INITIALIZED == status) {
- oldWriter = setStreamStatus(StreamStatus.FAILED,
StreamStatus.INITIALIZED,
- null, null, cause);
+ oldWriter = setStreamStatus(StreamStatus.ERROR,
StreamStatus.INITIALIZED, null, cause);
statusChanged = true;
}
}
if (statusChanged) {
Abortables.asyncAbort(oldWriter, false);
- logger.error("Failed to write data into stream {} : ", name,
cause);
- scheduleTryAcquireOnce(0L);
- }
- op.fail(cause);
- }
-
- /**
- * Handle losing ownership during executing <i>op</i>.
- *
- * @param op
- * stream operation executing
- * @param oafe
- * the ownership exception received when executing <i>op</i>
- */
- private void handleOwnershipAcquireFailedException(StreamOp op, final
OwnershipAcquireFailedException oafe) {
- logger.warn("Failed to write data into stream {} because stream is
acquired by {} : {}",
- new Object[]{name, oafe.getCurrentOwner(), oafe.getMessage()});
- AsyncLogWriter oldWriter = null;
- boolean statusChanged = false;
- synchronized (this) {
- if (StreamStatus.INITIALIZED == status) {
- oldWriter =
- setStreamStatus(StreamStatus.BACKOFF,
StreamStatus.INITIALIZED,
- null, oafe.getCurrentOwner(), oafe);
- statusChanged = true;
+ if (isCriticalException(cause)) {
+ logger.error("Failed to write data into stream {} : ", name,
cause);
+ } else {
+ logger.warn("Failed to write data into stream {} : {}", name,
cause.getMessage());
}
+ requestClose("Failed to write data into stream " + name + " : " +
cause.getMessage());
}
- if (statusChanged) {
- Abortables.asyncAbort(oldWriter, false);
- scheduleTryAcquireOnce(nextAcquireWaitTimeMs);
- }
- op.fail(oafe);
+ op.fail(cause);
}
/**
@@ -680,129 +540,126 @@ public class StreamImpl implements Stream {
fatalErrorHandler.notifyFatalError();
}
- void countException(Throwable t, StatsLogger streamExceptionLogger) {
- String exceptionName = null == t ? "null" : t.getClass().getName();
- Counter counter = exceptionCounters.get(exceptionName);
- if (null == counter) {
- counter = exceptionStatLogger.getCounter(exceptionName);
- Counter oldCounter = exceptionCounters.putIfAbsent(exceptionName,
counter);
- if (null != oldCounter) {
- counter = oldCounter;
- }
- }
- counter.inc();
- streamExceptionLogger.getCounter(exceptionName).inc();
- }
+ //
+ // Acquire streams
+ //
Future<Boolean> acquireStream() {
- // Reset this flag so the acquire thread knows whether re-acquire is
needed.
- writeSinceLastAcquire = false;
-
final Stopwatch stopwatch = Stopwatch.createStarted();
final Promise<Boolean> acquirePromise = new Promise<Boolean>();
manager.openAsyncLogWriter().addEventListener(FutureUtils.OrderedFutureEventListener.of(new
FutureEventListener<AsyncLogWriter>() {
@Override
public void onSuccess(AsyncLogWriter w) {
- synchronized (txnLock) {
- sequencer.setLastId(w.getLastTxId());
- }
- AsyncLogWriter oldWriter;
- Queue<StreamOp> oldPendingOps;
- boolean success;
- synchronized (StreamImpl.this) {
- oldWriter = setStreamStatus(StreamStatus.INITIALIZED,
- StreamStatus.INITIALIZING, w, null, null);
- oldPendingOps = pendingOps;
- pendingOps = new ArrayDeque<StreamOp>();
- success = true;
- }
- // check if the stream is allowed to be acquired
- if (!streamManager.allowAcquire(StreamImpl.this)) {
- if (null != oldWriter) {
- Abortables.asyncAbort(oldWriter, true);
- }
- int maxAcquiredPartitions =
dynConf.getMaxAcquiredPartitionsPerProxy();
- StreamUnavailableException sue = new
StreamUnavailableException("Stream " + partition.getStream()
- + " is not allowed to acquire more than " +
maxAcquiredPartitions + " partitions");
- countException(sue, exceptionStatLogger);
- logger.error("Failed to acquire stream {} because it is
unavailable : {}",
- name, sue.getMessage());
- synchronized (this) {
- oldWriter = setStreamStatus(StreamStatus.ERROR,
- StreamStatus.INITIALIZED, null, null, sue);
- // we don't switch the pending ops since they are
already switched
- // when setting the status to initialized
- success = false;
- }
- }
- processPendingRequestsAfterOpen(success, oldWriter,
oldPendingOps);
+ onAcquireStreamSuccess(w, stopwatch, acquirePromise);
}
@Override
public void onFailure(Throwable cause) {
- AsyncLogWriter oldWriter;
- Queue<StreamOp> oldPendingOps;
- boolean success;
- if (cause instanceof AlreadyClosedException) {
- countException(cause, streamExceptionStatLogger);
- handleAlreadyClosedException((AlreadyClosedException)
cause);
- return;
- } else if (cause instanceof OwnershipAcquireFailedException) {
- OwnershipAcquireFailedException oafe =
(OwnershipAcquireFailedException) cause;
- logger.warn("Failed to acquire stream ownership for {},
current owner is {} : {}",
- new Object[]{name, oafe.getCurrentOwner(),
oafe.getMessage()});
- synchronized (StreamImpl.this) {
- oldWriter = setStreamStatus(StreamStatus.BACKOFF,
- StreamStatus.INITIALIZING, null,
oafe.getCurrentOwner(), oafe);
- oldPendingOps = pendingOps;
- pendingOps = new ArrayDeque<StreamOp>();
- success = false;
- }
- } else if (cause instanceof InvalidStreamNameException) {
- InvalidStreamNameException isne =
(InvalidStreamNameException) cause;
- countException(isne, streamExceptionStatLogger);
- logger.error("Failed to acquire stream {} due to its name
is invalid", name);
- synchronized (StreamImpl.this) {
- oldWriter = setStreamStatus(StreamStatus.ERROR,
- StreamStatus.INITIALIZING, null, null, isne);
- oldPendingOps = pendingOps;
- pendingOps = new ArrayDeque<StreamOp>();
- success = false;
- }
- } else {
- countException(cause, streamExceptionStatLogger);
- logger.error("Failed to initialize stream {} : ", name,
cause);
- synchronized (StreamImpl.this) {
- oldWriter = setStreamStatus(StreamStatus.FAILED,
- StreamStatus.INITIALIZING, null, null, cause);
- oldPendingOps = pendingOps;
- pendingOps = new ArrayDeque<StreamOp>();
- success = false;
- }
- }
- processPendingRequestsAfterOpen(success, oldWriter,
oldPendingOps);
+ onAcquireStreamFailure(cause, stopwatch, acquirePromise);
}
- void processPendingRequestsAfterOpen(boolean success,
- AsyncLogWriter oldWriter,
- Queue<StreamOp>
oldPendingOps) {
- if (success) {
-
streamAcquireStat.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
- } else {
-
streamAcquireStat.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
- }
- for (StreamOp op : oldPendingOps) {
- executeOp(op, success);
- pendingOpsCounter.dec();
- }
- Abortables.asyncAbort(oldWriter, true);
- FutureUtils.setValue(acquirePromise, success);
- }
}, scheduler, getStreamName()));
return acquirePromise;
}
+ private void onAcquireStreamSuccess(AsyncLogWriter w,
+ Stopwatch stopwatch,
+ Promise<Boolean> acquirePromise) {
+ synchronized (txnLock) {
+ sequencer.setLastId(w.getLastTxId());
+ }
+ AsyncLogWriter oldWriter;
+ Queue<StreamOp> oldPendingOps;
+ boolean success;
+ synchronized (StreamImpl.this) {
+ oldWriter = setStreamStatus(StreamStatus.INITIALIZED,
+ StreamStatus.INITIALIZING, w, null);
+ oldPendingOps = pendingOps;
+ pendingOps = new ArrayDeque<StreamOp>();
+ success = true;
+ }
+ // check if the stream is allowed to be acquired
+ if (!streamManager.allowAcquire(StreamImpl.this)) {
+ if (null != oldWriter) {
+ Abortables.asyncAbort(oldWriter, true);
+ }
+ int maxAcquiredPartitions =
dynConf.getMaxAcquiredPartitionsPerProxy();
+ StreamUnavailableException sue = new
StreamUnavailableException("Stream " + partition.getStream()
+ + " is not allowed to acquire more than " +
maxAcquiredPartitions + " partitions");
+ countException(sue, exceptionStatLogger);
+ logger.error("Failed to acquire stream {} because it is
unavailable : {}",
+ name, sue.getMessage());
+ synchronized (this) {
+ oldWriter = setStreamStatus(StreamStatus.ERROR,
+ StreamStatus.INITIALIZED, null, sue);
+ // we don't switch the pending ops since they are already
switched
+ // when setting the status to initialized
+ success = false;
+ }
+ }
+ processPendingRequestsAfterAcquire(success, oldWriter, oldPendingOps,
stopwatch, acquirePromise);
+ }
+
+ private void onAcquireStreamFailure(Throwable cause,
+ Stopwatch stopwatch,
+ Promise<Boolean> acquirePromise) {
+ AsyncLogWriter oldWriter;
+ Queue<StreamOp> oldPendingOps;
+ boolean success;
+ if (cause instanceof AlreadyClosedException) {
+ countException(cause, streamExceptionStatLogger);
+ handleAlreadyClosedException((AlreadyClosedException) cause);
+ return;
+ } else {
+ if (isCriticalException(cause)) {
+ countException(cause, streamExceptionStatLogger);
+ logger.error("Failed to acquire stream {} : ", name, cause);
+ } else {
+ logger.warn("Failed to acquire stream {} : {}", name,
cause.getMessage());
+ }
+ synchronized (StreamImpl.this) {
+ oldWriter = setStreamStatus(StreamStatus.ERROR,
+ StreamStatus.INITIALIZING, null, cause);
+ oldPendingOps = pendingOps;
+ pendingOps = new ArrayDeque<StreamOp>();
+ success = false;
+ }
+ }
+ processPendingRequestsAfterAcquire(success, oldWriter, oldPendingOps,
stopwatch, acquirePromise);
+ }
+
+ /**
+ * Process the pending request after acquired stream.
+ *
+ * @param success whether the acquisition succeed or not
+ * @param oldWriter the old writer to abort
+ * @param oldPendingOps the old pending ops to execute
+ * @param stopwatch stopwatch to measure the time spent on acquisition
+ * @param acquirePromise the promise to complete the acquire operation
+ */
+ void processPendingRequestsAfterAcquire(boolean success,
+ AsyncLogWriter oldWriter,
+ Queue<StreamOp> oldPendingOps,
+ Stopwatch stopwatch,
+ Promise<Boolean> acquirePromise) {
+ if (success) {
+
streamAcquireStat.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+ } else {
+
streamAcquireStat.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+ }
+ for (StreamOp op : oldPendingOps) {
+ executeOp(op, success);
+ pendingOpsCounter.dec();
+ }
+ Abortables.asyncAbort(oldWriter, true);
+ FutureUtils.setValue(acquirePromise, success);
+ }
+
+ //
+ // Stream Status Changes
+ //
+
synchronized void setStreamInErrorStatus() {
if (StreamStatus.CLOSING == status || StreamStatus.CLOSED == status) {
return;
@@ -819,8 +676,6 @@ public class StreamImpl implements Stream {
* old status
* @param writer
* new log writer
- * @param owner
- * new owner
* @param t
* new exception
* @return old writer if it exists
@@ -828,7 +683,6 @@ public class StreamImpl implements Stream {
synchronized AsyncLogWriter setStreamStatus(StreamStatus newStatus,
StreamStatus oldStatus,
AsyncLogWriter writer,
- String owner,
Throwable t) {
if (oldStatus != this.status) {
logger.info("Stream {} status already changed from {} -> {} when
trying to change it to {}",
@@ -836,6 +690,11 @@ public class StreamImpl implements Stream {
return null;
}
+ String owner = null;
+ if (t instanceof OwnershipAcquireFailedException) {
+ owner = ((OwnershipAcquireFailedException) t).getCurrentOwner();
+ }
+
AsyncLogWriter oldWriter = this.writer;
this.writer = writer;
if (null != owner && owner.equals(clientId)) {
@@ -852,10 +711,6 @@ public class StreamImpl implements Stream {
}
this.lastException = t;
this.status = newStatus;
- if (StreamStatus.BACKOFF == newStatus && null != owner) {
- // start failure watch
- this.lastAcquireFailureWatch.reset().start();
- }
if (StreamStatus.INITIALIZED == newStatus) {
streamManager.notifyAcquired(this);
logger.info("Inserted acquired stream {} -> writer {}", name,
this);
@@ -866,12 +721,16 @@ public class StreamImpl implements Stream {
return oldWriter;
}
+ //
+ // Stream Close Functions
+ //
+
void close(DistributedLogManager dlm) {
if (null != dlm) {
try {
dlm.close();
} catch (IOException ioe) {
- logger.warn("Failed to close dlm for {} : ", ioe);
+ logger.warn("Failed to close dlm for {} : ", name, ioe);
}
}
}
@@ -902,12 +761,16 @@ public class StreamImpl implements Stream {
// them.
close(abort);
if (uncache) {
+ final long probationTimeoutMs;
+ if (null != owner) {
+ probationTimeoutMs = 2 *
dlConfig.getZKSessionTimeoutMilliseconds() / 3;
+ } else {
+ probationTimeoutMs = 0L;
+ }
closePromise.onSuccess(new AbstractFunction1<Void, BoxedUnit>() {
@Override
public BoxedUnit apply(Void result) {
- if (streamManager.notifyRemoved(StreamImpl.this)) {
- logger.info("Removed cached stream {} after closed.",
name);
- }
+ streamManager.scheduleRemoval(StreamImpl.this,
probationTimeoutMs);
return BoxedUnit.UNIT;
}
});
@@ -949,14 +812,6 @@ public class StreamImpl implements Stream {
closeLock.writeLock().unlock();
}
logger.info("Closing stream {} ...", name);
- running = false;
- // stop any outstanding ownership acquire actions first
- synchronized (this) {
- if (null != tryAcquireScheduledFuture) {
- tryAcquireScheduledFuture.cancel(true);
- }
- }
- logger.info("Stopped threads of stream {}.", name);
// Close the writers to release the locks before failing the requests
Future<Void> closeWriterFuture;
if (abort) {
@@ -1016,19 +871,6 @@ public class StreamImpl implements Stream {
// Test-only apis
@VisibleForTesting
- public StreamImpl suspendAcquiring() {
- suspended = true;
- return this;
- }
-
- @VisibleForTesting
- public StreamImpl resumeAcquiring() {
- suspended = false;
- scheduleTryAcquireOnce(0L);
- return this;
- }
-
- @VisibleForTesting
public int numPendingOps() {
Queue<StreamOp> queue = pendingOps;
return null == queue ? 0 : queue.size();
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f19e7564/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java
----------------------------------------------------------------------
diff --git
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java
index 972eb55..e171e46 100644
---
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java
+++
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java
@@ -43,10 +43,11 @@ public interface StreamManager {
/**
* Get a cached stream and create a new one if it doesnt exist.
- * @param stream name
+ * @param streamName stream name
+ * @param start whether to start the stream after it is created.
* @return future satisfied once close complete
*/
- Stream getOrCreateStream(String stream) throws IOException;
+ Stream getOrCreateStream(String streamName, boolean start) throws
IOException;
/**
* Asynchronously create a new stream.
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f19e7564/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java
----------------------------------------------------------------------
diff --git
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java
index aa08a24..df336fe 100644
---
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java
+++
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java
@@ -33,15 +33,14 @@ import
com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
import com.twitter.distributedlog.util.ConfUtils;
import com.twitter.util.Future;
import com.twitter.util.Promise;
-import java.io.IOException;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
@@ -233,7 +232,7 @@ public class StreamManagerImpl implements StreamManager {
}
@Override
- public Stream getOrCreateStream(String streamName) throws IOException {
+ public Stream getOrCreateStream(String streamName, boolean start) throws
IOException {
Stream stream = streams.get(streamName);
if (null == stream) {
closeLock.readLock().lock();
@@ -261,7 +260,9 @@ public class StreamManagerImpl implements StreamManager {
numCached.getAndIncrement();
logger.info("Inserted mapping stream name {} -> stream
{}", streamName, stream);
stream.initialize();
- stream.start();
+ if (start) {
+ stream.start();
+ }
}
} finally {
closeLock.readLock().unlock();
@@ -283,8 +284,10 @@ public class StreamManagerImpl implements StreamManager {
@Override
public void scheduleRemoval(final Stream stream, long delayMs) {
- logger.info("Scheduling removal of stream {} from cache after {} sec.",
- stream.getStreamName(), delayMs);
+ if (delayMs > 0) {
+ logger.info("Scheduling removal of stream {} from cache after {}
sec.",
+ stream.getStreamName(), delayMs);
+ }
schedule(new Runnable() {
@Override
public void run() {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f19e7564/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
----------------------------------------------------------------------
diff --git
a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
index 17fae4a..4195ed3 100644
---
a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
+++
b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
@@ -89,7 +89,8 @@ public class TestDistributedLogService extends
TestDistributedLogBase {
dlConf.addConfiguration(conf);
dlConf.setLockTimeout(0)
.setOutputBufferSize(0)
- .setPeriodicFlushFrequencyMilliSeconds(10);
+ .setPeriodicFlushFrequencyMilliSeconds(10)
+ .setSchedulerShutdownTimeoutMs(100);
serverConf = newLocalServerConf();
uri = createDLMURI("/" + testName.getMethodName());
ensureURICreated(uri);
@@ -171,10 +172,11 @@ public class TestDistributedLogService extends
TestDistributedLogBase {
public void testAcquireStreams() throws Exception {
String streamName = testName.getMethodName();
StreamImpl s0 = createUnstartedStream(service, streamName);
- s0.suspendAcquiring();
- DistributedLogServiceImpl service1 = createService(serverConf, dlConf);
+ ServerConfiguration serverConf1 = new ServerConfiguration();
+ serverConf1.addConfiguration(serverConf);
+ serverConf1.setServerPort(9999);
+ DistributedLogServiceImpl service1 = createService(serverConf1,
dlConf);
StreamImpl s1 = createUnstartedStream(service1, streamName);
- s1.suspendAcquiring();
// create write ops
WriteOp op0 = createWriteOp(service, streamName, 0L);
@@ -190,7 +192,7 @@ public class TestDistributedLogService extends
TestDistributedLogBase {
1, s1.numPendingOps());
// start acquiring s0
- s0.resumeAcquiring().start();
+ s0.start();
WriteResponse wr0 = Await.result(op0.result());
assertEquals("Op 0 should succeed",
StatusCode.SUCCESS, wr0.getHeader().getCode());
@@ -201,12 +203,12 @@ public class TestDistributedLogService extends
TestDistributedLogBase {
assertNull(s0.getLastException());
// start acquiring s1
- s1.resumeAcquiring().start();
+ s1.start();
WriteResponse wr1 = Await.result(op1.result());
assertEquals("Op 1 should fail",
StatusCode.FOUND, wr1.getHeader().getCode());
- assertEquals("Service 1 should be in BACKOFF state",
- StreamStatus.BACKOFF, s1.getStatus());
+ assertEquals("Service 1 should be in ERROR state",
+ StreamStatus.ERROR, s1.getStatus());
assertNotNull(s1.getManager());
assertNull(s1.getWriter());
assertNotNull(s1.getLastException());
@@ -727,7 +729,7 @@ public class TestDistributedLogService extends
TestDistributedLogBase {
for (Stream s : streamManager.getAcquiredStreams().values()) {
StreamImpl stream = (StreamImpl) s;
- stream.setStatus(StreamStatus.FAILED);
+ stream.setStatus(StreamStatus.ERROR);
}
Future<List<Void>> closeResult = localService.closeStreams();