http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java index 4dd4c12..14ebf4a 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java @@ -17,6 +17,7 @@ */ package org.apache.distributedlog.impl.logsegment; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.BookKeeperClient; import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.LogSegmentMetadata; @@ -35,10 +36,8 @@ import org.apache.distributedlog.logsegment.LogSegmentEntryWriter; import org.apache.distributedlog.logsegment.LogSegmentRandomAccessEntryReader; import org.apache.distributedlog.metadata.LogMetadataForWriter; import org.apache.distributedlog.util.Allocator; -import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.distributedlog.util.OrderedScheduler; -import com.twitter.util.Future; -import com.twitter.util.Promise; import org.apache.bookkeeper.client.AsyncCallback; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -65,13 +64,13 @@ public class BKLogSegmentEntryStore implements private final LogSegmentMetadata segment; private final long startEntryId; - private final Promise<LogSegmentEntryReader> openPromise; + private final CompletableFuture<LogSegmentEntryReader> openPromise; OpenReaderRequest(LogSegmentMetadata segment, long startEntryId) { this.segment = segment; this.startEntryId = startEntryId; - this.openPromise = new Promise<LogSegmentEntryReader>(); + this.openPromise = new CompletableFuture<LogSegmentEntryReader>(); } } @@ -79,11 +78,11 @@ public class BKLogSegmentEntryStore implements private static class DeleteLogSegmentRequest { private final LogSegmentMetadata segment; - private final Promise<LogSegmentMetadata> deletePromise; + private final CompletableFuture<LogSegmentMetadata> deletePromise; DeleteLogSegmentRequest(LogSegmentMetadata segment) { this.segment = segment; - this.deletePromise = new Promise<LogSegmentMetadata>(); + this.deletePromise = new CompletableFuture<LogSegmentMetadata>(); } } @@ -119,13 +118,13 @@ public class BKLogSegmentEntryStore implements } @Override - public Future<LogSegmentMetadata> deleteLogSegment(LogSegmentMetadata segment) { + public CompletableFuture<LogSegmentMetadata> deleteLogSegment(LogSegmentMetadata segment) { DeleteLogSegmentRequest request = new DeleteLogSegmentRequest(segment); BookKeeper bk; try { bk = this.bkc.get(); } catch (IOException e) { - return Future.exception(e); + return FutureUtils.exception(e); } bk.asyncDeleteLedger(segment.getLogSegmentId(), this, request); return request.deletePromise; @@ -141,11 +140,11 @@ public class BKLogSegmentEntryStore implements logger.error("Couldn't delete ledger {} from bookkeeper for {} : {}", new Object[]{ deleteRequest.segment.getLogSegmentId(), deleteRequest.segment, BKException.getMessage(rc) }); - FutureUtils.setException(deleteRequest.deletePromise, + FutureUtils.completeExceptionally(deleteRequest.deletePromise, new BKTransmitException("Couldn't delete log segment " + deleteRequest.segment, rc)); return; } - FutureUtils.setValue(deleteRequest.deletePromise, deleteRequest.segment); + FutureUtils.complete(deleteRequest.deletePromise, deleteRequest.segment); } // @@ -186,13 +185,13 @@ public class BKLogSegmentEntryStore implements // @Override - public Future<LogSegmentEntryReader> openReader(LogSegmentMetadata segment, + public CompletableFuture<LogSegmentEntryReader> openReader(LogSegmentMetadata segment, long startEntryId) { BookKeeper bk; try { bk = this.bkc.get(); } catch (IOException e) { - return Future.exception(e); + return FutureUtils.exception(e); } OpenReaderRequest request = new OpenReaderRequest(segment, startEntryId); if (segment.isInProgress()) { @@ -217,7 +216,7 @@ public class BKLogSegmentEntryStore implements public void openComplete(int rc, LedgerHandle lh, Object ctx) { OpenReaderRequest request = (OpenReaderRequest) ctx; if (BKException.Code.OK != rc) { - FutureUtils.setException( + FutureUtils.completeExceptionally( request.openPromise, new BKTransmitException("Failed to open ledger handle for log segment " + request.segment, rc)); return; @@ -233,28 +232,28 @@ public class BKLogSegmentEntryStore implements conf, statsLogger, failureInjector); - FutureUtils.setValue(request.openPromise, reader); + FutureUtils.complete(request.openPromise, reader); } catch (IOException e) { - FutureUtils.setException(request.openPromise, e); + FutureUtils.completeExceptionally(request.openPromise, e); } } @Override - public Future<LogSegmentRandomAccessEntryReader> openRandomAccessReader(final LogSegmentMetadata segment, + public CompletableFuture<LogSegmentRandomAccessEntryReader> openRandomAccessReader(final LogSegmentMetadata segment, final boolean fence) { final BookKeeper bk; try { bk = this.bkc.get(); } catch (IOException e) { - return Future.exception(e); + return FutureUtils.exception(e); } - final Promise<LogSegmentRandomAccessEntryReader> openPromise = new Promise<LogSegmentRandomAccessEntryReader>(); + final CompletableFuture<LogSegmentRandomAccessEntryReader> openPromise = new CompletableFuture<LogSegmentRandomAccessEntryReader>(); AsyncCallback.OpenCallback openCallback = new AsyncCallback.OpenCallback() { @Override public void openComplete(int rc, LedgerHandle lh, Object ctx) { if (BKException.Code.OK != rc) { - FutureUtils.setException( + FutureUtils.completeExceptionally( openPromise, new BKTransmitException("Failed to open ledger handle for log segment " + segment, rc)); return; @@ -263,7 +262,7 @@ public class BKLogSegmentEntryStore implements segment, lh, conf); - FutureUtils.setValue(openPromise, reader); + FutureUtils.complete(openPromise, reader); } }; if (segment.isInProgress() && !fence) {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java index d7b331b..254345e 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java @@ -18,14 +18,13 @@ package org.apache.distributedlog.impl.logsegment; import com.google.common.collect.Lists; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.Entry; import org.apache.distributedlog.LogSegmentMetadata; import org.apache.distributedlog.exceptions.BKTransmitException; import org.apache.distributedlog.logsegment.LogSegmentRandomAccessEntryReader; -import org.apache.distributedlog.util.FutureUtils; -import com.twitter.util.Future; -import com.twitter.util.Promise; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.LedgerEntry; @@ -49,7 +48,7 @@ class BKLogSegmentRandomAccessEntryReader implements // state private final LogSegmentMetadata metadata; private final LedgerHandle lh; - private Promise<Void> closePromise = null; + private CompletableFuture<Void> closePromise = null; BKLogSegmentRandomAccessEntryReader(LogSegmentMetadata metadata, LedgerHandle lh, @@ -68,8 +67,8 @@ class BKLogSegmentRandomAccessEntryReader implements } @Override - public Future<List<Entry.Reader>> readEntries(long startEntryId, long endEntryId) { - Promise<List<Entry.Reader>> promise = new Promise<List<Entry.Reader>>(); + public CompletableFuture<List<Entry.Reader>> readEntries(long startEntryId, long endEntryId) { + CompletableFuture<List<Entry.Reader>> promise = new CompletableFuture<List<Entry.Reader>>(); lh.asyncReadEntries(startEntryId, endEntryId, this, promise); return promise; } @@ -86,34 +85,37 @@ class BKLogSegmentRandomAccessEntryReader implements @Override public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> entries, Object ctx) { - Promise<List<Entry.Reader>> promise = (Promise<List<Entry.Reader>>) ctx; + CompletableFuture<List<Entry.Reader>> promise = (CompletableFuture<List<Entry.Reader>>) ctx; if (BKException.Code.OK == rc) { List<Entry.Reader> entryList = Lists.newArrayList(); while (entries.hasMoreElements()) { try { entryList.add(processReadEntry(entries.nextElement())); } catch (IOException ioe) { - FutureUtils.setException(promise, ioe); + FutureUtils.completeExceptionally(promise, ioe); return; } } - FutureUtils.setValue(promise, entryList); + FutureUtils.complete(promise, entryList); } else { - FutureUtils.setException(promise, + FutureUtils.completeExceptionally(promise, new BKTransmitException("Failed to read entries :", rc)); } } @Override - public Future<Void> asyncClose() { - final Promise<Void> closeFuture; + public CompletableFuture<Void> asyncClose() { + final CompletableFuture<Void> closeFuture; synchronized (this) { if (null != closePromise) { return closePromise; } - closeFuture = closePromise = new Promise<Void>(); + closeFuture = closePromise = new CompletableFuture<Void>(); } - BKUtils.closeLedgers(lh).proxyTo(closeFuture); + FutureUtils.proxyTo( + BKUtils.closeLedgers(lh), + closeFuture + ); return closeFuture; } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKUtils.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKUtils.java index 3c02740..82ba775 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKUtils.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKUtils.java @@ -18,11 +18,9 @@ package org.apache.distributedlog.impl.logsegment; import com.google.common.collect.Lists; -import org.apache.distributedlog.function.VoidFunctions; -import org.apache.distributedlog.util.FutureUtils; -import com.twitter.util.Future; -import com.twitter.util.Futures; -import com.twitter.util.Promise; +import java.util.concurrent.CompletableFuture; +import org.apache.distributedlog.common.functions.VoidFunctions; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.bookkeeper.client.AsyncCallback; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.LedgerHandle; @@ -40,15 +38,15 @@ public class BKUtils { * @param lh ledger handle * @return future represents close result. */ - public static Future<Void> closeLedger(LedgerHandle lh) { - final Promise<Void> closePromise = new Promise<Void>(); + public static CompletableFuture<Void> closeLedger(LedgerHandle lh) { + final CompletableFuture<Void> closePromise = new CompletableFuture<Void>(); lh.asyncClose(new AsyncCallback.CloseCallback() { @Override public void closeComplete(int rc, LedgerHandle lh, Object ctx) { if (BKException.Code.OK != rc) { - FutureUtils.setException(closePromise, BKException.create(rc)); + FutureUtils.completeExceptionally(closePromise, BKException.create(rc)); } else { - FutureUtils.setValue(closePromise, null); + FutureUtils.complete(closePromise, null); } } }, null); @@ -61,12 +59,12 @@ public class BKUtils { * @param lhs a list of ledgers * @return future represents close results. */ - public static Future<Void> closeLedgers(LedgerHandle ... lhs) { - List<Future<Void>> closeResults = Lists.newArrayListWithExpectedSize(lhs.length); + public static CompletableFuture<Void> closeLedgers(LedgerHandle ... lhs) { + List<CompletableFuture<Void>> closeResults = Lists.newArrayListWithExpectedSize(lhs.length); for (LedgerHandle lh : lhs) { closeResults.add(closeLedger(lh)); } - return Futures.collect(closeResults).map(VoidFunctions.LIST_TO_VOID_FUNC); + return FutureUtils.collect(closeResults).thenApply(VoidFunctions.LIST_TO_VOID_FUNC); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java index 30f9dd4..9b02462 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java @@ -20,10 +20,12 @@ package org.apache.distributedlog.impl.metadata; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.DistributedLogConstants; import org.apache.distributedlog.ZooKeeperClient; -import org.apache.distributedlog.exceptions.DLException; import org.apache.distributedlog.exceptions.DLInterruptedException; import org.apache.distributedlog.exceptions.InvalidStreamNameException; import org.apache.distributedlog.exceptions.LockCancelledException; @@ -42,18 +44,14 @@ import org.apache.distributedlog.metadata.LogMetadata; import org.apache.distributedlog.metadata.LogMetadataForReader; import org.apache.distributedlog.metadata.LogMetadataForWriter; import org.apache.distributedlog.util.DLUtils; -import org.apache.distributedlog.util.FutureUtils; -import org.apache.distributedlog.util.SchedulerUtils; +import org.apache.distributedlog.common.concurrent.FutureUtils; +import org.apache.distributedlog.common.util.SchedulerUtils; import org.apache.distributedlog.zk.LimitedPermitManager; import org.apache.distributedlog.util.OrderedScheduler; -import org.apache.distributedlog.util.PermitManager; +import org.apache.distributedlog.common.util.PermitManager; import org.apache.distributedlog.util.Transaction; import org.apache.distributedlog.util.Utils; import org.apache.distributedlog.zk.ZKTransaction; -import com.twitter.util.ExceptionalFunction; -import com.twitter.util.ExceptionalFunction0; -import com.twitter.util.Future; -import com.twitter.util.Promise; import org.apache.bookkeeper.meta.ZkVersion; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.versioning.Versioned; @@ -69,10 +67,7 @@ import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; -import java.io.File; import java.io.IOException; import java.net.URI; import java.util.List; @@ -120,14 +115,9 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore { private synchronized OrderedScheduler getLockStateExecutor(boolean createIfNull) { if (createIfNull && null == lockStateExecutor) { - StatsLogger lockStateStatsLogger = statsLogger.scope("lock_scheduler"); lockStateExecutor = OrderedScheduler.newBuilder() .name("DLM-LockState") .corePoolSize(conf.getNumLockStateThreads()) - .statsLogger(lockStateStatsLogger) - .perExecutorStatsLogger(lockStateStatsLogger) - .traceTaskExecution(conf.getEnableTaskExecutionStats()) - .traceTaskExecutionWarnTimeUs(conf.getTaskExecutionWarnTimeMicros()) .build(); } return lockStateExecutor; @@ -174,21 +164,21 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore { } @Override - public Future<Void> logExists(URI uri, final String logName) { + public CompletableFuture<Void> logExists(URI uri, final String logName) { final String logSegmentsPath = LogMetadata.getLogSegmentsPath( uri, logName, conf.getUnpartitionedStreamName()); - final Promise<Void> promise = new Promise<Void>(); + final CompletableFuture<Void> promise = new CompletableFuture<Void>(); try { final ZooKeeper zk = zooKeeperClient.get(); zk.sync(logSegmentsPath, new AsyncCallback.VoidCallback() { @Override public void processResult(int syncRc, String path, Object syncCtx) { if (KeeperException.Code.NONODE.intValue() == syncRc) { - promise.setException(new LogNotFoundException( + promise.completeExceptionally(new LogNotFoundException( String.format("Log %s does not exist or has been deleted", logName))); return; } else if (KeeperException.Code.OK.intValue() != syncRc){ - promise.setException(new ZKException("Error on checking log existence for " + logName, + promise.completeExceptionally(new ZKException("Error on checking log existence for " + logName, KeeperException.create(KeeperException.Code.get(syncRc)))); return; } @@ -196,12 +186,12 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore { @Override public void processResult(int rc, String path, Object ctx, Stat stat) { if (KeeperException.Code.OK.intValue() == rc) { - promise.setValue(null); + promise.complete(null); } else if (KeeperException.Code.NONODE.intValue() == rc) { - promise.setException(new LogNotFoundException( + promise.completeExceptionally(new LogNotFoundException( String.format("Log %s does not exist or has been deleted", logName))); } else { - promise.setException(new ZKException("Error on checking log existence for " + logName, + promise.completeExceptionally(new ZKException("Error on checking log existence for " + logName, KeeperException.create(KeeperException.Code.get(rc)))); } } @@ -211,10 +201,10 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore { } catch (InterruptedException ie) { LOG.error("Interrupted while reading {}", logSegmentsPath, ie); - promise.setException(new DLInterruptedException("Interrupted while checking " + promise.completeExceptionally(new DLInterruptedException("Interrupted while checking " + logSegmentsPath, ie)); } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - promise.setException(e); + promise.completeExceptionally(e); } return promise; } @@ -237,15 +227,13 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore { // Create Read Lock // - private Future<Void> ensureReadLockPathExist(final LogMetadata logMetadata, + private CompletableFuture<Void> ensureReadLockPathExist(final LogMetadata logMetadata, final String readLockPath) { - final Promise<Void> promise = new Promise<Void>(); - promise.setInterruptHandler(new com.twitter.util.Function<Throwable, BoxedUnit>() { - @Override - public BoxedUnit apply(Throwable t) { - FutureUtils.setException(promise, new LockCancelledException(readLockPath, - "Could not ensure read lock path", t)); - return null; + final CompletableFuture<Void> promise = new CompletableFuture<Void>(); + promise.whenComplete((value, cause) -> { + if (cause instanceof CancellationException) { + FutureUtils.completeExceptionally(promise, new LockCancelledException(readLockPath, + "Could not ensure read lock path", cause)); } }); Optional<String> parentPathShouldNotCreate = Optional.of(logMetadata.getLogRootPath()); @@ -255,21 +243,21 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore { @Override public void processResult(final int rc, final String path, Object ctx, String name) { if (KeeperException.Code.NONODE.intValue() == rc) { - FutureUtils.setException(promise, new LogNotFoundException( + FutureUtils.completeExceptionally(promise, new LogNotFoundException( String.format("Log %s does not exist or has been deleted", logMetadata.getFullyQualifiedName()))); } else if (KeeperException.Code.OK.intValue() == rc) { - FutureUtils.setValue(promise, null); + FutureUtils.complete(promise, null); LOG.trace("Created path {}.", path); } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) { - FutureUtils.setValue(promise, null); + FutureUtils.complete(promise, null); LOG.trace("Path {} is already existed.", path); } else if (DistributedLogConstants.ZK_CONNECTION_EXCEPTION_RESULT_CODE == rc) { - FutureUtils.setException(promise, new ZooKeeperClient.ZooKeeperConnectionException(path)); + FutureUtils.completeExceptionally(promise, new ZooKeeperClient.ZooKeeperConnectionException(path)); } else if (DistributedLogConstants.DL_INTERRUPTED_EXCEPTION_RESULT_CODE == rc) { - FutureUtils.setException(promise, new DLInterruptedException(path)); + FutureUtils.completeExceptionally(promise, new DLInterruptedException(path)); } else { - FutureUtils.setException(promise, KeeperException.create(KeeperException.Code.get(rc))); + FutureUtils.completeExceptionally(promise, KeeperException.create(KeeperException.Code.get(rc))); } } }, null); @@ -277,28 +265,19 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore { } @Override - public Future<DistributedLock> createReadLock(final LogMetadataForReader metadata, + public CompletableFuture<DistributedLock> createReadLock(final LogMetadataForReader metadata, Optional<String> readerId) { final String readLockPath = metadata.getReadLockPath(readerId); - return ensureReadLockPathExist(metadata, readLockPath).flatMap( - new ExceptionalFunction<Void, Future<DistributedLock>>() { - @Override - public Future<DistributedLock> applyE(Void value) throws Throwable { - // Unfortunately this has a blocking call which we should not execute on the - // ZK completion thread - return scheduler.apply(new ExceptionalFunction0<DistributedLock>() { - @Override - public DistributedLock applyE() throws Throwable { - return new ZKDistributedLock( - getLockStateExecutor(true), - getLockFactory(true), - readLockPath, - conf.getLockTimeoutMilliSeconds(), - statsLogger.scope("read_lock")); - } - }); - } - }); + return ensureReadLockPathExist(metadata, readLockPath) + .thenApplyAsync((value) -> { + DistributedLock lock = new ZKDistributedLock( + getLockStateExecutor(true), + getLockFactory(true), + readLockPath, + conf.getLockTimeoutMilliSeconds(), + statsLogger.scope("read_lock")); + return lock; + }, scheduler.chooseExecutor(readLockPath)); } // @@ -329,7 +308,7 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore { (byte) (i)}; } - static Future<List<Versioned<byte[]>>> checkLogMetadataPaths(ZooKeeper zk, + static CompletableFuture<List<Versioned<byte[]>>> checkLogMetadataPaths(ZooKeeper zk, String logRootPath, boolean ownAllocator) { // Note re. persistent lock state initialization: the read lock persistent state (path) is @@ -344,7 +323,7 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore { final String allocationPath = logRootPath + ALLOCATION_PATH; int numPaths = ownAllocator ? MetadataIndex.ALLOCATION + 1 : MetadataIndex.LOGSEGMENTS + 1; - List<Future<Versioned<byte[]>>> checkFutures = Lists.newArrayListWithExpectedSize(numPaths); + List<CompletableFuture<Versioned<byte[]>>> checkFutures = Lists.newArrayListWithExpectedSize(numPaths); checkFutures.add(Utils.zkGetData(zk, logRootParentPath, false)); checkFutures.add(Utils.zkGetData(zk, logRootPath, false)); checkFutures.add(Utils.zkGetData(zk, maxTxIdPath, false)); @@ -356,7 +335,7 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore { checkFutures.add(Utils.zkGetData(zk, allocationPath, false)); } - return Future.collect(checkFutures); + return FutureUtils.collect(checkFutures); } static boolean pathExists(Versioned<byte[]> metadata) { @@ -374,7 +353,7 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore { final List<ACL> acl, final boolean ownAllocator, final boolean createIfNotExists, - final Promise<List<Versioned<byte[]>>> promise) { + final CompletableFuture<List<Versioned<byte[]>>> promise) { final List<byte[]> pathsToCreate = Lists.newArrayListWithExpectedSize(metadatas.size()); final List<Op> zkOps = Lists.newArrayListWithExpectedSize(metadatas.size()); CreateMode createMode = CreateMode.PERSISTENT; @@ -447,11 +426,11 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore { } if (zkOps.isEmpty()) { // nothing missed - promise.setValue(metadatas); + promise.complete(metadatas); return; } if (!createIfNotExists) { - promise.setException(new LogNotFoundException("Log " + logRootPath + " not found")); + promise.completeExceptionally(new LogNotFoundException("Log " + logRootPath + " not found")); return; } @@ -469,9 +448,9 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore { finalMetadatas.add(new Versioned<byte[]>(dataCreated, new ZkVersion(0))); } } - promise.setValue(finalMetadatas); + promise.complete(finalMetadatas); } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) { - promise.setException(new LogExistsException("Someone just created log " + promise.completeExceptionally(new LogExistsException("Someone just created log " + logRootPath)); } else { if (LOG.isDebugEnabled()) { @@ -488,7 +467,7 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore { LOG.debug("Failed to create log, full rc list = {}", resultCodeList); } - promise.setException(new ZKException("Failed to create log " + logRootPath, + promise.completeExceptionally(new ZKException("Failed to create log " + logRootPath, KeeperException.Code.get(rc))); } } @@ -538,7 +517,7 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore { } } - static Future<LogMetadataForWriter> getLog(final URI uri, + static CompletableFuture<LogMetadataForWriter> getLog(final URI uri, final String logName, final String logIdentifier, final ZooKeeperClient zooKeeperClient, @@ -549,42 +528,47 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore { PathUtils.validatePath(logRootPath); } catch (IllegalArgumentException e) { LOG.error("Illegal path value {} for stream {}", new Object[]{logRootPath, logName, e}); - return Future.exception(new InvalidStreamNameException(logName, "Log name is invalid")); + return FutureUtils.exception(new InvalidStreamNameException(logName, "Log name is invalid")); } try { final ZooKeeper zk = zooKeeperClient.get(); return checkLogMetadataPaths(zk, logRootPath, ownAllocator) - .flatMap(new AbstractFunction1<List<Versioned<byte[]>>, Future<List<Versioned<byte[]>>>>() { + .thenCompose(new Function<List<Versioned<byte[]>>, CompletableFuture<List<Versioned<byte[]>>>>() { @Override - public Future<List<Versioned<byte[]>>> apply(List<Versioned<byte[]>> metadatas) { - Promise<List<Versioned<byte[]>>> promise = - new Promise<List<Versioned<byte[]>>>(); + public CompletableFuture<List<Versioned<byte[]>>> apply(List<Versioned<byte[]>> metadatas) { + CompletableFuture<List<Versioned<byte[]>>> promise = + new CompletableFuture<List<Versioned<byte[]>>>(); createMissingMetadata(zk, logRootPath, metadatas, zooKeeperClient.getDefaultACL(), ownAllocator, createIfNotExists, promise); return promise; } - }).map(new ExceptionalFunction<List<Versioned<byte[]>>, LogMetadataForWriter>() { + }).thenCompose(new Function<List<Versioned<byte[]>>, CompletableFuture<LogMetadataForWriter>>() { @Override - public LogMetadataForWriter applyE(List<Versioned<byte[]>> metadatas) throws DLException { - return processLogMetadatas( - uri, - logName, - logIdentifier, - metadatas, - ownAllocator); + public CompletableFuture<LogMetadataForWriter> apply(List<Versioned<byte[]>> metadatas) { + try { + return FutureUtils.value( + processLogMetadatas( + uri, + logName, + logIdentifier, + metadatas, + ownAllocator)); + } catch (UnexpectedException e) { + return FutureUtils.exception(e); + } } }); } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - return Future.exception(new ZKException("Encountered zookeeper connection issue on creating log " + logName, + return FutureUtils.exception(new ZKException("Encountered zookeeper connection issue on creating log " + logName, KeeperException.Code.CONNECTIONLOSS)); } catch (InterruptedException e) { - return Future.exception(new DLInterruptedException("Interrupted on creating log " + logName, e)); + return FutureUtils.exception(new DLInterruptedException("Interrupted on creating log " + logName, e)); } } @Override - public Future<LogMetadataForWriter> getLog(final URI uri, + public CompletableFuture<LogMetadataForWriter> getLog(final URI uri, final String logName, final boolean ownAllocator, final boolean createIfNotExists) { @@ -602,30 +586,30 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore { // @Override - public Future<Void> deleteLog(URI uri, final String logName) { - final Promise<Void> promise = new Promise<Void>(); + public CompletableFuture<Void> deleteLog(URI uri, final String logName) { + final CompletableFuture<Void> promise = new CompletableFuture<Void>(); try { String streamPath = LogMetadata.getLogStreamPath(uri, logName); ZKUtil.deleteRecursive(zooKeeperClient.get(), streamPath, new AsyncCallback.VoidCallback() { @Override public void processResult(int rc, String path, Object ctx) { if (KeeperException.Code.OK.intValue() != rc) { - FutureUtils.setException(promise, + FutureUtils.completeExceptionally(promise, new ZKException("Encountered zookeeper issue on deleting log stream " + logName, KeeperException.Code.get(rc))); return; } - FutureUtils.setValue(promise, null); + FutureUtils.complete(promise, null); } }, null); } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - FutureUtils.setException(promise, new ZKException("Encountered zookeeper issue on deleting log stream " + FutureUtils.completeExceptionally(promise, new ZKException("Encountered zookeeper issue on deleting log stream " + logName, KeeperException.Code.CONNECTIONLOSS)); } catch (InterruptedException e) { - FutureUtils.setException(promise, new DLInterruptedException("Interrupted while deleting log stream " + FutureUtils.completeExceptionally(promise, new DLInterruptedException("Interrupted while deleting log stream " + logName)); } catch (KeeperException e) { - FutureUtils.setException(promise, new ZKException("Encountered zookeeper issue on deleting log stream " + FutureUtils.completeExceptionally(promise, new ZKException("Encountered zookeeper issue on deleting log stream " + logName, e)); } return promise; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionStateStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionStateStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionStateStore.java index 64abb77..302c666 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionStateStore.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionStateStore.java @@ -17,27 +17,22 @@ */ package org.apache.distributedlog.impl.subscription; +import com.google.common.base.Charsets; import java.io.IOException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; - -import org.apache.distributedlog.subscription.SubscriptionStateStore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.runtime.BoxedUnit; - -import com.google.common.base.Charsets; - +import org.apache.distributedlog.DLSN; +import org.apache.distributedlog.ZooKeeperClient; +import org.apache.distributedlog.exceptions.DLInterruptedException; +import org.apache.distributedlog.api.subscription.SubscriptionStateStore; +import org.apache.distributedlog.common.concurrent.FutureUtils; +import org.apache.distributedlog.util.Utils; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; - -import org.apache.distributedlog.DLSN; -import org.apache.distributedlog.util.Utils; -import org.apache.distributedlog.ZooKeeperClient; -import org.apache.distributedlog.exceptions.DLInterruptedException; -import com.twitter.util.Future; -import com.twitter.util.Promise; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ZKSubscriptionStateStore implements SubscriptionStateStore { @@ -60,16 +55,16 @@ public class ZKSubscriptionStateStore implements SubscriptionStateStore { * Get the last committed position stored for this subscription */ @Override - public Future<DLSN> getLastCommitPosition() { + public CompletableFuture<DLSN> getLastCommitPosition() { if (null != lastCommittedPosition.get()) { - return Future.value(lastCommittedPosition.get()); + return FutureUtils.value(lastCommittedPosition.get()); } else { return getLastCommitPositionFromZK(); } } - Future<DLSN> getLastCommitPositionFromZK() { - final Promise<DLSN> result = new Promise<DLSN>(); + CompletableFuture<DLSN> getLastCommitPositionFromZK() { + final CompletableFuture<DLSN> result = new CompletableFuture<DLSN>(); try { logger.debug("Reading last commit position from path {}", zkPath); zooKeeperClient.get().getData(zkPath, false, new AsyncCallback.DataCallback() { @@ -77,25 +72,25 @@ public class ZKSubscriptionStateStore implements SubscriptionStateStore { public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { logger.debug("Read last commit position from path {}: rc = {}", zkPath, rc); if (KeeperException.Code.NONODE.intValue() == rc) { - result.setValue(DLSN.NonInclusiveLowerBound); + result.complete(DLSN.NonInclusiveLowerBound); } else if (KeeperException.Code.OK.intValue() != rc) { - result.setException(KeeperException.create(KeeperException.Code.get(rc), path)); + result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path)); } else { try { DLSN dlsn = DLSN.deserialize(new String(data, Charsets.UTF_8)); - result.setValue(dlsn); + result.complete(dlsn); } catch (Exception t) { logger.warn("Invalid last commit position found from path {}", zkPath, t); // invalid dlsn recorded in subscription state store - result.setValue(DLSN.NonInclusiveLowerBound); + result.complete(DLSN.NonInclusiveLowerBound); } } } }, null); } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) { - result.setException(zkce); + result.completeExceptionally(zkce); } catch (InterruptedException ie) { - result.setException(new DLInterruptedException("getLastCommitPosition was interrupted", ie)); + result.completeExceptionally(new DLInterruptedException("getLastCommitPosition was interrupted", ie)); } return result; } @@ -106,7 +101,7 @@ public class ZKSubscriptionStateStore implements SubscriptionStateStore { * @param newPosition - new commit position */ @Override - public Future<BoxedUnit> advanceCommitPosition(DLSN newPosition) { + public CompletableFuture<Void> advanceCommitPosition(DLSN newPosition) { if (null == lastCommittedPosition.get() || (newPosition.compareTo(lastCommittedPosition.get()) > 0)) { lastCommittedPosition.set(newPosition); @@ -115,7 +110,7 @@ public class ZKSubscriptionStateStore implements SubscriptionStateStore { zooKeeperClient.getDefaultACL(), CreateMode.PERSISTENT); } else { - return Future.Done(); + return FutureUtils.Void(); } } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionsStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionsStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionsStore.java index d75f5fc..0392264 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionsStore.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionsStore.java @@ -17,30 +17,26 @@ */ package org.apache.distributedlog.impl.subscription; -import org.apache.distributedlog.DLSN; -import org.apache.distributedlog.ZooKeeperClient; -import org.apache.distributedlog.exceptions.DLInterruptedException; -import org.apache.distributedlog.subscription.SubscriptionStateStore; -import org.apache.distributedlog.subscription.SubscriptionsStore; -import org.apache.distributedlog.util.Utils; -import com.twitter.util.Future; -import com.twitter.util.Promise; - -import org.apache.bookkeeper.meta.ZkVersion; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.Stat; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; - import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.bookkeeper.meta.ZkVersion; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.distributedlog.DLSN; +import org.apache.distributedlog.ZooKeeperClient; +import org.apache.distributedlog.exceptions.DLInterruptedException; +import org.apache.distributedlog.api.subscription.SubscriptionStateStore; +import org.apache.distributedlog.api.subscription.SubscriptionsStore; +import org.apache.distributedlog.common.concurrent.FutureUtils; +import org.apache.distributedlog.util.Utils; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; /** * ZooKeeper Based Subscriptions Store. @@ -82,72 +78,62 @@ public class ZKSubscriptionsStore implements SubscriptionsStore { } @Override - public Future<DLSN> getLastCommitPosition(String subscriberId) { + public CompletableFuture<DLSN> getLastCommitPosition(String subscriberId) { return getSubscriber(subscriberId).getLastCommitPosition(); } @Override - public Future<Map<String, DLSN>> getLastCommitPositions() { - final Promise<Map<String, DLSN>> result = new Promise<Map<String, DLSN>>(); + public CompletableFuture<Map<String, DLSN>> getLastCommitPositions() { + final CompletableFuture<Map<String, DLSN>> result = new CompletableFuture<Map<String, DLSN>>(); try { this.zkc.get().getChildren(this.zkPath, false, new AsyncCallback.Children2Callback() { @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { if (KeeperException.Code.NONODE.intValue() == rc) { - result.setValue(new HashMap<String, DLSN>()); + result.complete(new HashMap<String, DLSN>()); } else if (KeeperException.Code.OK.intValue() != rc) { - result.setException(KeeperException.create(KeeperException.Code.get(rc), path)); + result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path)); } else { getLastCommitPositions(result, children); } } }, null); } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) { - result.setException(zkce); + result.completeExceptionally(zkce); } catch (InterruptedException ie) { - result.setException(new DLInterruptedException("getLastCommitPositions was interrupted", ie)); + result.completeExceptionally(new DLInterruptedException("getLastCommitPositions was interrupted", ie)); } return result; } - private void getLastCommitPositions(final Promise<Map<String, DLSN>> result, + private void getLastCommitPositions(final CompletableFuture<Map<String, DLSN>> result, List<String> subscribers) { - List<Future<Pair<String, DLSN>>> futures = - new ArrayList<Future<Pair<String, DLSN>>>(subscribers.size()); + List<CompletableFuture<Pair<String, DLSN>>> futures = + new ArrayList<CompletableFuture<Pair<String, DLSN>>>(subscribers.size()); for (String s : subscribers) { final String subscriber = s; - Future<Pair<String, DLSN>> future = + CompletableFuture<Pair<String, DLSN>> future = // Get the last commit position from zookeeper - getSubscriber(subscriber).getLastCommitPositionFromZK().map( - new AbstractFunction1<DLSN, Pair<String, DLSN>>() { - @Override - public Pair<String, DLSN> apply(DLSN dlsn) { - return Pair.of(subscriber, dlsn); - } - }); + getSubscriber(subscriber).getLastCommitPositionFromZK().thenApply( + dlsn -> Pair.of(subscriber, dlsn)); futures.add(future); } - Future.collect(futures).foreach( - new AbstractFunction1<List<Pair<String, DLSN>>, BoxedUnit>() { - @Override - public BoxedUnit apply(List<Pair<String, DLSN>> subscriptions) { - Map<String, DLSN> subscriptionMap = new HashMap<String, DLSN>(); - for (Pair<String, DLSN> pair : subscriptions) { - subscriptionMap.put(pair.getLeft(), pair.getRight()); - } - result.setValue(subscriptionMap); - return BoxedUnit.UNIT; - } - }); + FutureUtils.collect(futures).thenAccept((subscriptions) -> { + Map<String, DLSN> subscriptionMap = new HashMap<String, DLSN>(); + for (Pair<String, DLSN> pair : subscriptions) { + subscriptionMap.put(pair.getLeft(), pair.getRight()); + } + result.complete(subscriptionMap); + }); } @Override - public Future<BoxedUnit> advanceCommitPosition(String subscriberId, DLSN newPosition) { + public CompletableFuture<Void> advanceCommitPosition(String subscriberId, DLSN newPosition) { return getSubscriber(subscriberId).advanceCommitPosition(newPosition); } @Override - public Future<Boolean> deleteSubscriber(String subscriberId) { + public CompletableFuture<Boolean> deleteSubscriber(String subscriberId) { subscribers.remove(subscriberId); String path = getSubscriberZKPath(subscriberId); return Utils.zkDeleteIfNotExist(zkc, path, new ZkVersion(-1)); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/io/Abortable.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/io/Abortable.java b/distributedlog-core/src/main/java/org/apache/distributedlog/io/Abortable.java deleted file mode 100644 index b2b430d..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/io/Abortable.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.io; - -import java.io.IOException; - -/** - * An {@code Abortable} is a source or destination of data that can be aborted. - * The abort method is invoked to release resources that the object is holding - * (such as open files). The abort happens when the object is in an error state, - * which it couldn't be closed gracefully. - * - * @see java.io.Closeable - * @since 0.3.32 - */ -public interface Abortable { - - /** - * Aborts the object and releases any resources associated with it. - * If the object is already aborted then invoking this method has no - * effect. - * - * @throws IOException if an I/O error occurs. - */ - public void abort() throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/io/Abortables.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/io/Abortables.java b/distributedlog-core/src/main/java/org/apache/distributedlog/io/Abortables.java deleted file mode 100644 index a4838b1..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/io/Abortables.java +++ /dev/null @@ -1,183 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.io; - -import com.google.common.collect.Lists; -import org.apache.distributedlog.function.VoidFunctions; -import org.apache.distributedlog.util.FutureUtils; -import com.twitter.util.Future; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; -import java.io.IOException; -import java.util.List; -import java.util.concurrent.ExecutorService; - -/** - * Utility methods for working with {@link Abortable} objects. - * - * @since 0.3.32 - */ -public final class Abortables { - - static final Logger logger = LoggerFactory.getLogger(Abortables.class); - - private Abortables() {} - - public static Future<Void> asyncAbort(@Nullable AsyncAbortable abortable, - boolean swallowIOException) { - if (null == abortable) { - return Future.Void(); - } else if (swallowIOException) { - return FutureUtils.ignore(abortable.asyncAbort()); - } else { - return abortable.asyncAbort(); - } - } - - /** - * Aborts a {@link Abortable}, with control over whether an {@link IOException} may be thrown. - * This is primarily useful in a finally block, where a thrown exception needs to be logged but - * not propagated (otherwise the original exception will be lost). - * - * <p>If {@code swallowIOException} is true then we never throw {@code IOException} but merely log it. - * - * <p>Example: <pre> {@code - * - * public void abortStreamNicely() throws IOException { - * SomeStream stream = new SomeStream("foo"); - * try { - * // ... code which does something with the stream ... - * } catch (IOException ioe) { - * // If an exception occurs, we might abort the stream. - * Abortables.abort(stream, true); - * } - * }}</pre> - * - * @param abortable the {@code Abortable} object to be aborted, or null, in which case this method - * does nothing. - * @param swallowIOException if true, don't propagate IO exceptions thrown by the {@code abort} methods - * @throws IOException if {@code swallowIOException} is false and {@code abort} throws an {@code IOException} - */ - public static void abort(@Nullable Abortable abortable, - boolean swallowIOException) - throws IOException { - if (null == abortable) { - return; - } - try { - abortable.abort(); - } catch (IOException ioe) { - if (swallowIOException) { - logger.warn("IOException thrown while aborting Abortable {} : ", abortable, ioe); - } else { - throw ioe; - } - } - } - - /** - * Abort async <i>abortable</i> - * - * @param abortable the {@code AsyncAbortable} object to be aborted, or null, in which case this method - * does nothing. - * @param swallowIOException if true, don't propagate IO exceptions thrown by the {@code abort} methods - * @throws IOException if {@code swallowIOException} is false and {@code abort} throws an {@code IOException} - * @see #abort(Abortable, boolean) - */ - public static void abort(@Nullable AsyncAbortable abortable, - boolean swallowIOException) - throws IOException { - if (null == abortable) { - return; - } - try { - FutureUtils.result(abortable.asyncAbort()); - } catch (IOException ioe) { - if (swallowIOException) { - logger.warn("IOException thrown while aborting Abortable {} : ", abortable, ioe); - } else { - throw ioe; - } - } - } - - /** - * Aborts the given {@code abortable}, logging any {@code IOException} that's thrown rather than - * propagating it. - * - * While it's not safe in the general case to ignore exceptions that are thrown when aborting an - * I/O resource, it should generally be safe in the case of a resource that's being used only for - * reading. - * - * @param abortable the {@code Abortable} to be closed, or {@code null} in which case this method - * does nothing. - */ - public static void abortQuietly(@Nullable Abortable abortable) { - try { - abort(abortable, true); - } catch (IOException e) { - logger.error("Unexpected IOException thrown while aborting Abortable {} quietly : ", abortable, e); - } - } - - /** - * Aborts the given {@code abortable}, logging any {@code IOException} that's thrown rather than - * propagating it. - * - * While it's not safe in the general case to ignore exceptions that are thrown when aborting an - * I/O resource, it should generally be safe in the case of a resource that's being used only for - * reading. - * - * @param abortable the {@code AsyncAbortable} to be closed, or {@code null} in which case this method - * does nothing. - */ - public static void abortQuietly(@Nullable AsyncAbortable abortable) { - try { - abort(abortable, true); - } catch (IOException e) { - logger.error("Unexpected IOException thrown while aborting Abortable {} quietly : ", abortable, e); - } - } - - /** - * Abort the abortables in sequence. - * - * @param executorService - * executor service to execute - * @param abortables - * abortables to abort - * @return future represents the abort future - */ - public static Future<Void> abortSequence(ExecutorService executorService, - AsyncAbortable... abortables) { - List<AsyncAbortable> abortableList = Lists.newArrayListWithExpectedSize(abortables.length); - for (AsyncAbortable abortable : abortables) { - if (null == abortable) { - abortableList.add(AsyncAbortable.NULL); - } else { - abortableList.add(abortable); - } - } - return FutureUtils.processList( - abortableList, - AsyncAbortable.ABORT_FUNC, - executorService).map(VoidFunctions.LIST_TO_VOID_FUNC); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncAbortable.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncAbortable.java b/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncAbortable.java deleted file mode 100644 index 7ec26a1..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncAbortable.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.io; - -import com.twitter.util.Function; -import com.twitter.util.Future; - -/** - * An {@code Abortable} is a source or destination of data that can be aborted. - * The abort method is invoked to release resources that the object is holding - * (such as open files). The abort happens when the object is in an error state, - * which it couldn't be closed gracefully. - * - * @see AsyncCloseable - * @see Abortable - * @since 0.3.43 - */ -public interface AsyncAbortable { - - Function<AsyncAbortable, Future<Void>> ABORT_FUNC = new Function<AsyncAbortable, Future<Void>>() { - @Override - public Future<Void> apply(AsyncAbortable abortable) { - return abortable.asyncAbort(); - } - }; - - AsyncAbortable NULL = new AsyncAbortable() { - @Override - public Future<Void> asyncAbort() { - return Future.Void(); - } - }; - - /** - * Aborts the object and releases any resources associated with it. - * If the object is already aborted then invoking this method has no - * effect. - * - * @return future represents the abort result - */ - Future<Void> asyncAbort(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncCloseable.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncCloseable.java b/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncCloseable.java deleted file mode 100644 index 2bf0119..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncCloseable.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.io; - -import org.apache.distributedlog.util.FutureUtils; -import com.twitter.util.Function; -import com.twitter.util.Future; - -/** - * A {@code AsyncCloseable} is a source or destination of data that can be closed asynchronously. - * The close method is invoked to release resources that the object is - * holding (such as open files). - */ -public interface AsyncCloseable { - - Function<AsyncCloseable, Future<Void>> CLOSE_FUNC = new Function<AsyncCloseable, Future<Void>>() { - @Override - public Future<Void> apply(AsyncCloseable closeable) { - return closeable.asyncClose(); - } - }; - - Function<AsyncCloseable, Future<Void>> CLOSE_FUNC_IGNORE_ERRORS = new Function<AsyncCloseable, Future<Void>>() { - @Override - public Future<Void> apply(AsyncCloseable closeable) { - return FutureUtils.ignore(closeable.asyncClose()); - } - }; - - AsyncCloseable NULL = new AsyncCloseable() { - @Override - public Future<Void> asyncClose() { - return Future.Void(); - } - }; - - /** - * Closes this source and releases any system resources associated - * with it. If the source is already closed then invoking this - * method has no effect. - * - * @return future representing the close result. - */ - Future<Void> asyncClose(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncDeleteable.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncDeleteable.java b/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncDeleteable.java deleted file mode 100644 index 046c731..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncDeleteable.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.io; - -import com.twitter.util.Future; - -/** - * A {@code AsyncDeleteable} is a source or destination of data that can be deleted asynchronously. - * This delete method is invoked to delete the source. - */ -public interface AsyncDeleteable { - /** - * Releases any system resources associated with this and delete the source. If the source is - * already deleted then invoking this method has no effect. - * - * @return future representing the deletion result. - */ - Future<Void> delete(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/io/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/io/package-info.java b/distributedlog-core/src/main/java/org/apache/distributedlog/io/package-info.java deleted file mode 100644 index eb81cfe..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/io/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * IO Utils for distributedlog - */ -package org.apache.distributedlog.io; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/limiter/ComposableRequestLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/limiter/ComposableRequestLimiter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/limiter/ComposableRequestLimiter.java index 95165ef..ae01bf7 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/limiter/ComposableRequestLimiter.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/limiter/ComposableRequestLimiter.java @@ -17,16 +17,10 @@ */ package org.apache.distributedlog.limiter; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; - -import org.apache.distributedlog.exceptions.OverCapacityException; -import org.apache.distributedlog.limiter.GuavaRateLimiter; -import org.apache.distributedlog.limiter.RateLimiter; - import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.StatsLogger; - +import org.apache.distributedlog.exceptions.OverCapacityException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/lock/DistributedLock.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/DistributedLock.java b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/DistributedLock.java index 986678c..156d6dd 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/DistributedLock.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/DistributedLock.java @@ -17,9 +17,9 @@ */ package org.apache.distributedlog.lock; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.exceptions.LockingException; import org.apache.distributedlog.io.AsyncCloseable; -import com.twitter.util.Future; /** * Interface for distributed locking @@ -31,7 +31,7 @@ public interface DistributedLock extends AsyncCloseable { * * @return future represents the acquire result. */ - Future<? extends DistributedLock> asyncAcquire(); + CompletableFuture<? extends DistributedLock> asyncAcquire(); /** * Check if hold lock. If it doesn't, then re-acquire the lock. http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/lock/LockWaiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/LockWaiter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/LockWaiter.java index b70098e..1cb3364 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/LockWaiter.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/LockWaiter.java @@ -17,10 +17,13 @@ */ package org.apache.distributedlog.lock; -import com.twitter.util.Await; -import com.twitter.util.Duration; -import com.twitter.util.Future; -import com.twitter.util.Timer; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.apache.distributedlog.exceptions.DLInterruptedException; +import org.apache.distributedlog.common.concurrent.FutureUtils; +import org.apache.distributedlog.util.OrderedScheduler; +import org.apache.distributedlog.util.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,11 +36,11 @@ public class LockWaiter { private final String lockId; private final String currentOwner; - private final Future<Boolean> acquireFuture; + private final CompletableFuture<Boolean> acquireFuture; public LockWaiter(String lockId, String currentOwner, - Future<Boolean> acquireFuture) { + CompletableFuture<Boolean> acquireFuture) { this.lockId = lockId; this.currentOwner = currentOwner; this.acquireFuture = acquireFuture; @@ -64,12 +67,13 @@ public class LockWaiter { /** * Return the future representing the waiting result. * - * <p>If the future is interrupted (e.g. {@link Future#within(Duration, Timer)}), + * <p>If the future is interrupted + * (e.g. {@link FutureUtils#within(CompletableFuture, long, TimeUnit, Throwable, OrderedScheduler, Object)}), * the waiter will automatically clean up its waiting state. * * @return the future representing the acquire result. */ - public Future<Boolean> getAcquireFuture() { + public CompletableFuture<Boolean> getAcquireFuture() { return acquireFuture; } @@ -81,12 +85,12 @@ public class LockWaiter { public boolean waitForAcquireQuietly() { boolean success = false; try { - success = Await.result(acquireFuture); - } catch (InterruptedException ie) { + success = Utils.ioResult(acquireFuture); + } catch (DLInterruptedException ie) { Thread.currentThread().interrupt(); } catch (LockTimeoutException lte) { logger.debug("Timeout on lock acquiring", lte); - } catch (Exception e) { + } catch (IOException e) { logger.error("Caught exception waiting for lock acquired", e); } return success; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/lock/NopDistributedLock.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/NopDistributedLock.java b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/NopDistributedLock.java index 88abffa..7f770ad 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/NopDistributedLock.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/NopDistributedLock.java @@ -17,8 +17,9 @@ */ package org.apache.distributedlog.lock; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.exceptions.LockingException; -import com.twitter.util.Future; +import org.apache.distributedlog.common.concurrent.FutureUtils; /** * An implementation of {@link DistributedLock} which does nothing. @@ -30,8 +31,8 @@ public class NopDistributedLock implements DistributedLock { private NopDistributedLock() {} @Override - public Future<? extends DistributedLock> asyncAcquire() { - return Future.value(this); + public CompletableFuture<? extends DistributedLock> asyncAcquire() { + return FutureUtils.value(this); } @Override @@ -45,7 +46,7 @@ public class NopDistributedLock implements DistributedLock { } @Override - public Future<Void> asyncClose() { - return Future.Void(); + public CompletableFuture<Void> asyncClose() { + return FutureUtils.Void(); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/lock/SessionLock.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/SessionLock.java b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/SessionLock.java index 8aec2c0..3a46a13 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/SessionLock.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/SessionLock.java @@ -17,12 +17,10 @@ */ package org.apache.distributedlog.lock; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import org.apache.distributedlog.exceptions.LockingException; import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException; -import com.twitter.util.Future; -import scala.runtime.BoxedUnit; - -import java.util.concurrent.TimeUnit; /** * One time lock. @@ -71,7 +69,7 @@ public interface SessionLock { * <i>tryLock</i> here is effectively the combination of following asynchronous calls. * <pre> * ZKDistributedLock lock = ...; - * Future<LockWaiter> attemptFuture = lock.asyncTryLock(...); + * CompletableFuture<LockWaiter> attemptFuture = lock.asyncTryLock(...); * * boolean acquired = waiter.waitForAcquireQuietly(); * if (acquired) { @@ -106,7 +104,7 @@ public interface SessionLock { * @return lock waiter representing this attempt of acquiring lock. * @see #tryLock(long, TimeUnit) */ - Future<LockWaiter> asyncTryLock(long timeout, TimeUnit unit); + CompletableFuture<LockWaiter> asyncTryLock(long timeout, TimeUnit unit); /** * Release a claimed lock. @@ -121,6 +119,6 @@ public interface SessionLock { * @return future representing the result of unlock operation. * @see #unlock() */ - Future<BoxedUnit> asyncUnlock(); + CompletableFuture<Void> asyncUnlock(); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/lock/SessionLockFactory.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/SessionLockFactory.java b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/SessionLockFactory.java index a68f2d8..9d3159e 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/SessionLockFactory.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/SessionLockFactory.java @@ -17,7 +17,7 @@ */ package org.apache.distributedlog.lock; -import com.twitter.util.Future; +import java.util.concurrent.CompletableFuture; /** * Factory to create {@link SessionLock} @@ -33,6 +33,6 @@ public interface SessionLockFactory { * lock context * @return future represents the creation result. */ - Future<SessionLock> createLock(String lockPath, DistributedLockContext context); + CompletableFuture<SessionLock> createLock(String lockPath, DistributedLockContext context); }