http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
index 4dd4c12..14ebf4a 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
@@ -17,6 +17,7 @@
  */
 package org.apache.distributedlog.impl.logsegment;
 
+import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.BookKeeperClient;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.LogSegmentMetadata;
@@ -35,10 +36,8 @@ import 
org.apache.distributedlog.logsegment.LogSegmentEntryWriter;
 import org.apache.distributedlog.logsegment.LogSegmentRandomAccessEntryReader;
 import org.apache.distributedlog.metadata.LogMetadataForWriter;
 import org.apache.distributedlog.util.Allocator;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.util.OrderedScheduler;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
 import org.apache.bookkeeper.client.AsyncCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -65,13 +64,13 @@ public class BKLogSegmentEntryStore implements
 
         private final LogSegmentMetadata segment;
         private final long startEntryId;
-        private final Promise<LogSegmentEntryReader> openPromise;
+        private final CompletableFuture<LogSegmentEntryReader> openPromise;
 
         OpenReaderRequest(LogSegmentMetadata segment,
                           long startEntryId) {
             this.segment = segment;
             this.startEntryId = startEntryId;
-            this.openPromise = new Promise<LogSegmentEntryReader>();
+            this.openPromise = new CompletableFuture<LogSegmentEntryReader>();
         }
 
     }
@@ -79,11 +78,11 @@ public class BKLogSegmentEntryStore implements
     private static class DeleteLogSegmentRequest {
 
         private final LogSegmentMetadata segment;
-        private final Promise<LogSegmentMetadata> deletePromise;
+        private final CompletableFuture<LogSegmentMetadata> deletePromise;
 
         DeleteLogSegmentRequest(LogSegmentMetadata segment) {
             this.segment = segment;
-            this.deletePromise = new Promise<LogSegmentMetadata>();
+            this.deletePromise = new CompletableFuture<LogSegmentMetadata>();
         }
 
     }
@@ -119,13 +118,13 @@ public class BKLogSegmentEntryStore implements
     }
 
     @Override
-    public Future<LogSegmentMetadata> deleteLogSegment(LogSegmentMetadata 
segment) {
+    public CompletableFuture<LogSegmentMetadata> 
deleteLogSegment(LogSegmentMetadata segment) {
         DeleteLogSegmentRequest request = new DeleteLogSegmentRequest(segment);
         BookKeeper bk;
         try {
             bk = this.bkc.get();
         } catch (IOException e) {
-            return Future.exception(e);
+            return FutureUtils.exception(e);
         }
         bk.asyncDeleteLedger(segment.getLogSegmentId(), this, request);
         return request.deletePromise;
@@ -141,11 +140,11 @@ public class BKLogSegmentEntryStore implements
             logger.error("Couldn't delete ledger {} from bookkeeper for {} : 
{}",
                     new Object[]{ deleteRequest.segment.getLogSegmentId(), 
deleteRequest.segment,
                             BKException.getMessage(rc) });
-            FutureUtils.setException(deleteRequest.deletePromise,
+            FutureUtils.completeExceptionally(deleteRequest.deletePromise,
                     new BKTransmitException("Couldn't delete log segment " + 
deleteRequest.segment, rc));
             return;
         }
-        FutureUtils.setValue(deleteRequest.deletePromise, 
deleteRequest.segment);
+        FutureUtils.complete(deleteRequest.deletePromise, 
deleteRequest.segment);
     }
 
     //
@@ -186,13 +185,13 @@ public class BKLogSegmentEntryStore implements
     //
 
     @Override
-    public Future<LogSegmentEntryReader> openReader(LogSegmentMetadata segment,
+    public CompletableFuture<LogSegmentEntryReader> 
openReader(LogSegmentMetadata segment,
                                                     long startEntryId) {
         BookKeeper bk;
         try {
             bk = this.bkc.get();
         } catch (IOException e) {
-            return Future.exception(e);
+            return FutureUtils.exception(e);
         }
         OpenReaderRequest request = new OpenReaderRequest(segment, 
startEntryId);
         if (segment.isInProgress()) {
@@ -217,7 +216,7 @@ public class BKLogSegmentEntryStore implements
     public void openComplete(int rc, LedgerHandle lh, Object ctx) {
         OpenReaderRequest request = (OpenReaderRequest) ctx;
         if (BKException.Code.OK != rc) {
-            FutureUtils.setException(
+            FutureUtils.completeExceptionally(
                     request.openPromise,
                     new BKTransmitException("Failed to open ledger handle for 
log segment " + request.segment, rc));
             return;
@@ -233,28 +232,28 @@ public class BKLogSegmentEntryStore implements
                     conf,
                     statsLogger,
                     failureInjector);
-            FutureUtils.setValue(request.openPromise, reader);
+            FutureUtils.complete(request.openPromise, reader);
         } catch (IOException e) {
-            FutureUtils.setException(request.openPromise, e);
+            FutureUtils.completeExceptionally(request.openPromise, e);
         }
 
     }
 
     @Override
-    public Future<LogSegmentRandomAccessEntryReader> 
openRandomAccessReader(final LogSegmentMetadata segment,
+    public CompletableFuture<LogSegmentRandomAccessEntryReader> 
openRandomAccessReader(final LogSegmentMetadata segment,
                                                                             
final boolean fence) {
         final BookKeeper bk;
         try {
             bk = this.bkc.get();
         } catch (IOException e) {
-            return Future.exception(e);
+            return FutureUtils.exception(e);
         }
-        final Promise<LogSegmentRandomAccessEntryReader> openPromise = new 
Promise<LogSegmentRandomAccessEntryReader>();
+        final CompletableFuture<LogSegmentRandomAccessEntryReader> openPromise 
= new CompletableFuture<LogSegmentRandomAccessEntryReader>();
         AsyncCallback.OpenCallback openCallback = new 
AsyncCallback.OpenCallback() {
             @Override
             public void openComplete(int rc, LedgerHandle lh, Object ctx) {
                 if (BKException.Code.OK != rc) {
-                    FutureUtils.setException(
+                    FutureUtils.completeExceptionally(
                             openPromise,
                             new BKTransmitException("Failed to open ledger 
handle for log segment " + segment, rc));
                     return;
@@ -263,7 +262,7 @@ public class BKLogSegmentEntryStore implements
                         segment,
                         lh,
                         conf);
-                FutureUtils.setValue(openPromise, reader);
+                FutureUtils.complete(openPromise, reader);
             }
         };
         if (segment.isInProgress() && !fence) {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java
index d7b331b..254345e 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java
@@ -18,14 +18,13 @@
 package org.apache.distributedlog.impl.logsegment;
 
 import com.google.common.collect.Lists;
+import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.Entry;
 import org.apache.distributedlog.LogSegmentMetadata;
 import org.apache.distributedlog.exceptions.BKTransmitException;
 import org.apache.distributedlog.logsegment.LogSegmentRandomAccessEntryReader;
-import org.apache.distributedlog.util.FutureUtils;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.LedgerEntry;
@@ -49,7 +48,7 @@ class BKLogSegmentRandomAccessEntryReader implements
     // state
     private final LogSegmentMetadata metadata;
     private final LedgerHandle lh;
-    private Promise<Void> closePromise = null;
+    private CompletableFuture<Void> closePromise = null;
 
     BKLogSegmentRandomAccessEntryReader(LogSegmentMetadata metadata,
                                         LedgerHandle lh,
@@ -68,8 +67,8 @@ class BKLogSegmentRandomAccessEntryReader implements
     }
 
     @Override
-    public Future<List<Entry.Reader>> readEntries(long startEntryId, long 
endEntryId) {
-        Promise<List<Entry.Reader>> promise = new 
Promise<List<Entry.Reader>>();
+    public CompletableFuture<List<Entry.Reader>> readEntries(long 
startEntryId, long endEntryId) {
+        CompletableFuture<List<Entry.Reader>> promise = new 
CompletableFuture<List<Entry.Reader>>();
         lh.asyncReadEntries(startEntryId, endEntryId, this, promise);
         return promise;
     }
@@ -86,34 +85,37 @@ class BKLogSegmentRandomAccessEntryReader implements
 
     @Override
     public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> 
entries, Object ctx) {
-        Promise<List<Entry.Reader>> promise = (Promise<List<Entry.Reader>>) 
ctx;
+        CompletableFuture<List<Entry.Reader>> promise = 
(CompletableFuture<List<Entry.Reader>>) ctx;
         if (BKException.Code.OK == rc) {
             List<Entry.Reader> entryList = Lists.newArrayList();
             while (entries.hasMoreElements()) {
                 try {
                     entryList.add(processReadEntry(entries.nextElement()));
                 } catch (IOException ioe) {
-                    FutureUtils.setException(promise, ioe);
+                    FutureUtils.completeExceptionally(promise, ioe);
                     return;
                 }
             }
-            FutureUtils.setValue(promise, entryList);
+            FutureUtils.complete(promise, entryList);
         } else {
-            FutureUtils.setException(promise,
+            FutureUtils.completeExceptionally(promise,
                     new BKTransmitException("Failed to read entries :", rc));
         }
     }
 
     @Override
-    public Future<Void> asyncClose() {
-        final Promise<Void> closeFuture;
+    public CompletableFuture<Void> asyncClose() {
+        final CompletableFuture<Void> closeFuture;
         synchronized (this) {
             if (null != closePromise) {
                 return closePromise;
             }
-            closeFuture = closePromise = new Promise<Void>();
+            closeFuture = closePromise = new CompletableFuture<Void>();
         }
-        BKUtils.closeLedgers(lh).proxyTo(closeFuture);
+        FutureUtils.proxyTo(
+            BKUtils.closeLedgers(lh),
+            closeFuture
+        );
         return closeFuture;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKUtils.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKUtils.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKUtils.java
index 3c02740..82ba775 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKUtils.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKUtils.java
@@ -18,11 +18,9 @@
 package org.apache.distributedlog.impl.logsegment;
 
 import com.google.common.collect.Lists;
-import org.apache.distributedlog.function.VoidFunctions;
-import org.apache.distributedlog.util.FutureUtils;
-import com.twitter.util.Future;
-import com.twitter.util.Futures;
-import com.twitter.util.Promise;
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.common.functions.VoidFunctions;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.client.AsyncCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.LedgerHandle;
@@ -40,15 +38,15 @@ public class BKUtils {
      * @param lh ledger handle
      * @return future represents close result.
      */
-    public static Future<Void> closeLedger(LedgerHandle lh) {
-        final Promise<Void> closePromise = new Promise<Void>();
+    public static CompletableFuture<Void> closeLedger(LedgerHandle lh) {
+        final CompletableFuture<Void> closePromise = new 
CompletableFuture<Void>();
         lh.asyncClose(new AsyncCallback.CloseCallback() {
             @Override
             public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
                 if (BKException.Code.OK != rc) {
-                    FutureUtils.setException(closePromise, 
BKException.create(rc));
+                    FutureUtils.completeExceptionally(closePromise, 
BKException.create(rc));
                 } else {
-                    FutureUtils.setValue(closePromise, null);
+                    FutureUtils.complete(closePromise, null);
                 }
             }
         }, null);
@@ -61,12 +59,12 @@ public class BKUtils {
      * @param lhs a list of ledgers
      * @return future represents close results.
      */
-    public static Future<Void> closeLedgers(LedgerHandle ... lhs) {
-        List<Future<Void>> closeResults = 
Lists.newArrayListWithExpectedSize(lhs.length);
+    public static CompletableFuture<Void> closeLedgers(LedgerHandle ... lhs) {
+        List<CompletableFuture<Void>> closeResults = 
Lists.newArrayListWithExpectedSize(lhs.length);
         for (LedgerHandle lh : lhs) {
             closeResults.add(closeLedger(lh));
         }
-        return 
Futures.collect(closeResults).map(VoidFunctions.LIST_TO_VOID_FUNC);
+        return 
FutureUtils.collect(closeResults).thenApply(VoidFunctions.LIST_TO_VOID_FUNC);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
index 30f9dd4..9b02462 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
@@ -20,10 +20,12 @@ package org.apache.distributedlog.impl.metadata;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.DistributedLogConstants;
 import org.apache.distributedlog.ZooKeeperClient;
-import org.apache.distributedlog.exceptions.DLException;
 import org.apache.distributedlog.exceptions.DLInterruptedException;
 import org.apache.distributedlog.exceptions.InvalidStreamNameException;
 import org.apache.distributedlog.exceptions.LockCancelledException;
@@ -42,18 +44,14 @@ import org.apache.distributedlog.metadata.LogMetadata;
 import org.apache.distributedlog.metadata.LogMetadataForReader;
 import org.apache.distributedlog.metadata.LogMetadataForWriter;
 import org.apache.distributedlog.util.DLUtils;
-import org.apache.distributedlog.util.FutureUtils;
-import org.apache.distributedlog.util.SchedulerUtils;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
+import org.apache.distributedlog.common.util.SchedulerUtils;
 import org.apache.distributedlog.zk.LimitedPermitManager;
 import org.apache.distributedlog.util.OrderedScheduler;
-import org.apache.distributedlog.util.PermitManager;
+import org.apache.distributedlog.common.util.PermitManager;
 import org.apache.distributedlog.util.Transaction;
 import org.apache.distributedlog.util.Utils;
 import org.apache.distributedlog.zk.ZKTransaction;
-import com.twitter.util.ExceptionalFunction;
-import com.twitter.util.ExceptionalFunction0;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
 import org.apache.bookkeeper.meta.ZkVersion;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.versioning.Versioned;
@@ -69,10 +67,7 @@ import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
 
-import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.util.List;
@@ -120,14 +115,9 @@ public class ZKLogStreamMetadataStore implements 
LogStreamMetadataStore {
 
     private synchronized OrderedScheduler getLockStateExecutor(boolean 
createIfNull) {
         if (createIfNull && null == lockStateExecutor) {
-            StatsLogger lockStateStatsLogger = 
statsLogger.scope("lock_scheduler");
             lockStateExecutor = OrderedScheduler.newBuilder()
                     .name("DLM-LockState")
                     .corePoolSize(conf.getNumLockStateThreads())
-                    .statsLogger(lockStateStatsLogger)
-                    .perExecutorStatsLogger(lockStateStatsLogger)
-                    .traceTaskExecution(conf.getEnableTaskExecutionStats())
-                    
.traceTaskExecutionWarnTimeUs(conf.getTaskExecutionWarnTimeMicros())
                     .build();
         }
         return lockStateExecutor;
@@ -174,21 +164,21 @@ public class ZKLogStreamMetadataStore implements 
LogStreamMetadataStore {
     }
 
     @Override
-    public Future<Void> logExists(URI uri, final String logName) {
+    public CompletableFuture<Void> logExists(URI uri, final String logName) {
         final String logSegmentsPath = LogMetadata.getLogSegmentsPath(
                 uri, logName, conf.getUnpartitionedStreamName());
-        final Promise<Void> promise = new Promise<Void>();
+        final CompletableFuture<Void> promise = new CompletableFuture<Void>();
         try {
             final ZooKeeper zk = zooKeeperClient.get();
             zk.sync(logSegmentsPath, new AsyncCallback.VoidCallback() {
                 @Override
                 public void processResult(int syncRc, String path, Object 
syncCtx) {
                     if (KeeperException.Code.NONODE.intValue() == syncRc) {
-                        promise.setException(new LogNotFoundException(
+                        promise.completeExceptionally(new LogNotFoundException(
                                 String.format("Log %s does not exist or has 
been deleted", logName)));
                         return;
                     } else if (KeeperException.Code.OK.intValue() != syncRc){
-                        promise.setException(new ZKException("Error on 
checking log existence for " + logName,
+                        promise.completeExceptionally(new ZKException("Error 
on checking log existence for " + logName,
                                 
KeeperException.create(KeeperException.Code.get(syncRc))));
                         return;
                     }
@@ -196,12 +186,12 @@ public class ZKLogStreamMetadataStore implements 
LogStreamMetadataStore {
                         @Override
                         public void processResult(int rc, String path, Object 
ctx, Stat stat) {
                             if (KeeperException.Code.OK.intValue() == rc) {
-                                promise.setValue(null);
+                                promise.complete(null);
                             } else if (KeeperException.Code.NONODE.intValue() 
== rc) {
-                                promise.setException(new LogNotFoundException(
+                                promise.completeExceptionally(new 
LogNotFoundException(
                                         String.format("Log %s does not exist 
or has been deleted", logName)));
                             } else {
-                                promise.setException(new ZKException("Error on 
checking log existence for " + logName,
+                                promise.completeExceptionally(new 
ZKException("Error on checking log existence for " + logName,
                                         
KeeperException.create(KeeperException.Code.get(rc))));
                             }
                         }
@@ -211,10 +201,10 @@ public class ZKLogStreamMetadataStore implements 
LogStreamMetadataStore {
 
         } catch (InterruptedException ie) {
             LOG.error("Interrupted while reading {}", logSegmentsPath, ie);
-            promise.setException(new DLInterruptedException("Interrupted while 
checking "
+            promise.completeExceptionally(new 
DLInterruptedException("Interrupted while checking "
                     + logSegmentsPath, ie));
         } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            promise.setException(e);
+            promise.completeExceptionally(e);
         }
         return promise;
     }
@@ -237,15 +227,13 @@ public class ZKLogStreamMetadataStore implements 
LogStreamMetadataStore {
     // Create Read Lock
     //
 
-    private Future<Void> ensureReadLockPathExist(final LogMetadata logMetadata,
+    private CompletableFuture<Void> ensureReadLockPathExist(final LogMetadata 
logMetadata,
                                                  final String readLockPath) {
-        final Promise<Void> promise = new Promise<Void>();
-        promise.setInterruptHandler(new com.twitter.util.Function<Throwable, 
BoxedUnit>() {
-            @Override
-            public BoxedUnit apply(Throwable t) {
-                FutureUtils.setException(promise, new 
LockCancelledException(readLockPath,
-                        "Could not ensure read lock path", t));
-                return null;
+        final CompletableFuture<Void> promise = new CompletableFuture<Void>();
+        promise.whenComplete((value, cause) -> {
+            if (cause instanceof CancellationException) {
+                FutureUtils.completeExceptionally(promise, new 
LockCancelledException(readLockPath,
+                        "Could not ensure read lock path", cause));
             }
         });
         Optional<String> parentPathShouldNotCreate = 
Optional.of(logMetadata.getLogRootPath());
@@ -255,21 +243,21 @@ public class ZKLogStreamMetadataStore implements 
LogStreamMetadataStore {
                     @Override
                     public void processResult(final int rc, final String path, 
Object ctx, String name) {
                         if (KeeperException.Code.NONODE.intValue() == rc) {
-                            FutureUtils.setException(promise, new 
LogNotFoundException(
+                            FutureUtils.completeExceptionally(promise, new 
LogNotFoundException(
                                     String.format("Log %s does not exist or 
has been deleted",
                                             
logMetadata.getFullyQualifiedName())));
                         } else if (KeeperException.Code.OK.intValue() == rc) {
-                            FutureUtils.setValue(promise, null);
+                            FutureUtils.complete(promise, null);
                             LOG.trace("Created path {}.", path);
                         } else if (KeeperException.Code.NODEEXISTS.intValue() 
== rc) {
-                            FutureUtils.setValue(promise, null);
+                            FutureUtils.complete(promise, null);
                             LOG.trace("Path {} is already existed.", path);
                         } else if 
(DistributedLogConstants.ZK_CONNECTION_EXCEPTION_RESULT_CODE == rc) {
-                            FutureUtils.setException(promise, new 
ZooKeeperClient.ZooKeeperConnectionException(path));
+                            FutureUtils.completeExceptionally(promise, new 
ZooKeeperClient.ZooKeeperConnectionException(path));
                         } else if 
(DistributedLogConstants.DL_INTERRUPTED_EXCEPTION_RESULT_CODE == rc) {
-                            FutureUtils.setException(promise, new 
DLInterruptedException(path));
+                            FutureUtils.completeExceptionally(promise, new 
DLInterruptedException(path));
                         } else {
-                            FutureUtils.setException(promise, 
KeeperException.create(KeeperException.Code.get(rc)));
+                            FutureUtils.completeExceptionally(promise, 
KeeperException.create(KeeperException.Code.get(rc)));
                         }
                     }
                 }, null);
@@ -277,28 +265,19 @@ public class ZKLogStreamMetadataStore implements 
LogStreamMetadataStore {
     }
 
     @Override
-    public Future<DistributedLock> createReadLock(final LogMetadataForReader 
metadata,
+    public CompletableFuture<DistributedLock> createReadLock(final 
LogMetadataForReader metadata,
                                                   Optional<String> readerId) {
         final String readLockPath = metadata.getReadLockPath(readerId);
-        return ensureReadLockPathExist(metadata, readLockPath).flatMap(
-                new ExceptionalFunction<Void, Future<DistributedLock>>() {
-            @Override
-            public Future<DistributedLock> applyE(Void value) throws Throwable 
{
-                // Unfortunately this has a blocking call which we should not 
execute on the
-                // ZK completion thread
-                return scheduler.apply(new 
ExceptionalFunction0<DistributedLock>() {
-                    @Override
-                    public DistributedLock applyE() throws Throwable {
-                        return new ZKDistributedLock(
-                            getLockStateExecutor(true),
-                            getLockFactory(true),
-                            readLockPath,
-                            conf.getLockTimeoutMilliSeconds(),
-                            statsLogger.scope("read_lock"));
-                    }
-                });
-            }
-        });
+        return ensureReadLockPathExist(metadata, readLockPath)
+            .thenApplyAsync((value) -> {
+                DistributedLock lock = new ZKDistributedLock(
+                    getLockStateExecutor(true),
+                    getLockFactory(true),
+                    readLockPath,
+                    conf.getLockTimeoutMilliSeconds(),
+                    statsLogger.scope("read_lock"));
+                return lock;
+            }, scheduler.chooseExecutor(readLockPath));
     }
 
     //
@@ -329,7 +308,7 @@ public class ZKLogStreamMetadataStore implements 
LogStreamMetadataStore {
             (byte) (i)};
     }
 
-    static Future<List<Versioned<byte[]>>> checkLogMetadataPaths(ZooKeeper zk,
+    static CompletableFuture<List<Versioned<byte[]>>> 
checkLogMetadataPaths(ZooKeeper zk,
                                                                  String 
logRootPath,
                                                                  boolean 
ownAllocator) {
         // Note re. persistent lock state initialization: the read lock 
persistent state (path) is
@@ -344,7 +323,7 @@ public class ZKLogStreamMetadataStore implements 
LogStreamMetadataStore {
         final String allocationPath = logRootPath + ALLOCATION_PATH;
 
         int numPaths = ownAllocator ? MetadataIndex.ALLOCATION + 1 : 
MetadataIndex.LOGSEGMENTS + 1;
-        List<Future<Versioned<byte[]>>> checkFutures = 
Lists.newArrayListWithExpectedSize(numPaths);
+        List<CompletableFuture<Versioned<byte[]>>> checkFutures = 
Lists.newArrayListWithExpectedSize(numPaths);
         checkFutures.add(Utils.zkGetData(zk, logRootParentPath, false));
         checkFutures.add(Utils.zkGetData(zk, logRootPath, false));
         checkFutures.add(Utils.zkGetData(zk, maxTxIdPath, false));
@@ -356,7 +335,7 @@ public class ZKLogStreamMetadataStore implements 
LogStreamMetadataStore {
             checkFutures.add(Utils.zkGetData(zk, allocationPath, false));
         }
 
-        return Future.collect(checkFutures);
+        return FutureUtils.collect(checkFutures);
     }
 
     static boolean pathExists(Versioned<byte[]> metadata) {
@@ -374,7 +353,7 @@ public class ZKLogStreamMetadataStore implements 
LogStreamMetadataStore {
                                       final List<ACL> acl,
                                       final boolean ownAllocator,
                                       final boolean createIfNotExists,
-                                      final Promise<List<Versioned<byte[]>>> 
promise) {
+                                      final 
CompletableFuture<List<Versioned<byte[]>>> promise) {
         final List<byte[]> pathsToCreate = 
Lists.newArrayListWithExpectedSize(metadatas.size());
         final List<Op> zkOps = 
Lists.newArrayListWithExpectedSize(metadatas.size());
         CreateMode createMode = CreateMode.PERSISTENT;
@@ -447,11 +426,11 @@ public class ZKLogStreamMetadataStore implements 
LogStreamMetadataStore {
         }
         if (zkOps.isEmpty()) {
             // nothing missed
-            promise.setValue(metadatas);
+            promise.complete(metadatas);
             return;
         }
         if (!createIfNotExists) {
-            promise.setException(new LogNotFoundException("Log " + logRootPath 
+ " not found"));
+            promise.completeExceptionally(new LogNotFoundException("Log " + 
logRootPath + " not found"));
             return;
         }
 
@@ -469,9 +448,9 @@ public class ZKLogStreamMetadataStore implements 
LogStreamMetadataStore {
                             finalMetadatas.add(new 
Versioned<byte[]>(dataCreated, new ZkVersion(0)));
                         }
                     }
-                    promise.setValue(finalMetadatas);
+                    promise.complete(finalMetadatas);
                 } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
-                    promise.setException(new LogExistsException("Someone just 
created log "
+                    promise.completeExceptionally(new 
LogExistsException("Someone just created log "
                             + logRootPath));
                 } else {
                     if (LOG.isDebugEnabled()) {
@@ -488,7 +467,7 @@ public class ZKLogStreamMetadataStore implements 
LogStreamMetadataStore {
                         LOG.debug("Failed to create log, full rc list = {}", 
resultCodeList);
                     }
 
-                    promise.setException(new ZKException("Failed to create log 
" + logRootPath,
+                    promise.completeExceptionally(new ZKException("Failed to 
create log " + logRootPath,
                             KeeperException.Code.get(rc)));
                 }
             }
@@ -538,7 +517,7 @@ public class ZKLogStreamMetadataStore implements 
LogStreamMetadataStore {
         }
     }
 
-    static Future<LogMetadataForWriter> getLog(final URI uri,
+    static CompletableFuture<LogMetadataForWriter> getLog(final URI uri,
                                                final String logName,
                                                final String logIdentifier,
                                                final ZooKeeperClient 
zooKeeperClient,
@@ -549,42 +528,47 @@ public class ZKLogStreamMetadataStore implements 
LogStreamMetadataStore {
             PathUtils.validatePath(logRootPath);
         } catch (IllegalArgumentException e) {
             LOG.error("Illegal path value {} for stream {}", new 
Object[]{logRootPath, logName, e});
-            return Future.exception(new InvalidStreamNameException(logName, 
"Log name is invalid"));
+            return FutureUtils.exception(new 
InvalidStreamNameException(logName, "Log name is invalid"));
         }
 
         try {
             final ZooKeeper zk = zooKeeperClient.get();
             return checkLogMetadataPaths(zk, logRootPath, ownAllocator)
-                    .flatMap(new AbstractFunction1<List<Versioned<byte[]>>, 
Future<List<Versioned<byte[]>>>>() {
+                    .thenCompose(new Function<List<Versioned<byte[]>>, 
CompletableFuture<List<Versioned<byte[]>>>>() {
                         @Override
-                        public Future<List<Versioned<byte[]>>> 
apply(List<Versioned<byte[]>> metadatas) {
-                            Promise<List<Versioned<byte[]>>> promise =
-                                    new Promise<List<Versioned<byte[]>>>();
+                        public CompletableFuture<List<Versioned<byte[]>>> 
apply(List<Versioned<byte[]>> metadatas) {
+                            CompletableFuture<List<Versioned<byte[]>>> promise 
=
+                                    new 
CompletableFuture<List<Versioned<byte[]>>>();
                             createMissingMetadata(zk, logRootPath, metadatas, 
zooKeeperClient.getDefaultACL(),
                                     ownAllocator, createIfNotExists, promise);
                             return promise;
                         }
-                    }).map(new ExceptionalFunction<List<Versioned<byte[]>>, 
LogMetadataForWriter>() {
+                    }).thenCompose(new Function<List<Versioned<byte[]>>, 
CompletableFuture<LogMetadataForWriter>>() {
                         @Override
-                        public LogMetadataForWriter 
applyE(List<Versioned<byte[]>> metadatas) throws DLException {
-                            return processLogMetadatas(
-                                    uri,
-                                    logName,
-                                    logIdentifier,
-                                    metadatas,
-                                    ownAllocator);
+                        public CompletableFuture<LogMetadataForWriter> 
apply(List<Versioned<byte[]>> metadatas) {
+                            try {
+                                return FutureUtils.value(
+                                    processLogMetadatas(
+                                        uri,
+                                        logName,
+                                        logIdentifier,
+                                        metadatas,
+                                        ownAllocator));
+                            } catch (UnexpectedException e) {
+                                return FutureUtils.exception(e);
+                            }
                         }
                     });
         } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            return Future.exception(new ZKException("Encountered zookeeper 
connection issue on creating log " + logName,
+            return FutureUtils.exception(new ZKException("Encountered 
zookeeper connection issue on creating log " + logName,
                     KeeperException.Code.CONNECTIONLOSS));
         } catch (InterruptedException e) {
-            return Future.exception(new DLInterruptedException("Interrupted on 
creating log " + logName, e));
+            return FutureUtils.exception(new 
DLInterruptedException("Interrupted on creating log " + logName, e));
         }
     }
 
     @Override
-    public Future<LogMetadataForWriter> getLog(final URI uri,
+    public CompletableFuture<LogMetadataForWriter> getLog(final URI uri,
                                                final String logName,
                                                final boolean ownAllocator,
                                                final boolean 
createIfNotExists) {
@@ -602,30 +586,30 @@ public class ZKLogStreamMetadataStore implements 
LogStreamMetadataStore {
     //
 
     @Override
-    public Future<Void> deleteLog(URI uri, final String logName) {
-        final Promise<Void> promise = new Promise<Void>();
+    public CompletableFuture<Void> deleteLog(URI uri, final String logName) {
+        final CompletableFuture<Void> promise = new CompletableFuture<Void>();
         try {
             String streamPath = LogMetadata.getLogStreamPath(uri, logName);
             ZKUtil.deleteRecursive(zooKeeperClient.get(), streamPath, new 
AsyncCallback.VoidCallback() {
                 @Override
                 public void processResult(int rc, String path, Object ctx) {
                     if (KeeperException.Code.OK.intValue() != rc) {
-                        FutureUtils.setException(promise,
+                        FutureUtils.completeExceptionally(promise,
                                 new ZKException("Encountered zookeeper issue 
on deleting log stream "
                                         + logName, 
KeeperException.Code.get(rc)));
                         return;
                     }
-                    FutureUtils.setValue(promise, null);
+                    FutureUtils.complete(promise, null);
                 }
             }, null);
         } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            FutureUtils.setException(promise, new ZKException("Encountered 
zookeeper issue on deleting log stream "
+            FutureUtils.completeExceptionally(promise, new 
ZKException("Encountered zookeeper issue on deleting log stream "
                     + logName, KeeperException.Code.CONNECTIONLOSS));
         } catch (InterruptedException e) {
-            FutureUtils.setException(promise, new 
DLInterruptedException("Interrupted while deleting log stream "
+            FutureUtils.completeExceptionally(promise, new 
DLInterruptedException("Interrupted while deleting log stream "
                     + logName));
         } catch (KeeperException e) {
-            FutureUtils.setException(promise, new ZKException("Encountered 
zookeeper issue on deleting log stream "
+            FutureUtils.completeExceptionally(promise, new 
ZKException("Encountered zookeeper issue on deleting log stream "
                     + logName, e));
         }
         return promise;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionStateStore.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionStateStore.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionStateStore.java
index 64abb77..302c666 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionStateStore.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionStateStore.java
@@ -17,27 +17,22 @@
  */
 package org.apache.distributedlog.impl.subscription;
 
+import com.google.common.base.Charsets;
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.distributedlog.subscription.SubscriptionStateStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.BoxedUnit;
-
-import com.google.common.base.Charsets;
-
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.ZooKeeperClient;
+import org.apache.distributedlog.exceptions.DLInterruptedException;
+import org.apache.distributedlog.api.subscription.SubscriptionStateStore;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
+import org.apache.distributedlog.util.Utils;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
-
-import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.util.Utils;
-import org.apache.distributedlog.ZooKeeperClient;
-import org.apache.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ZKSubscriptionStateStore implements SubscriptionStateStore {
 
@@ -60,16 +55,16 @@ public class ZKSubscriptionStateStore implements 
SubscriptionStateStore {
      * Get the last committed position stored for this subscription
      */
     @Override
-    public Future<DLSN> getLastCommitPosition() {
+    public CompletableFuture<DLSN> getLastCommitPosition() {
         if (null != lastCommittedPosition.get()) {
-            return Future.value(lastCommittedPosition.get());
+            return FutureUtils.value(lastCommittedPosition.get());
         } else {
             return getLastCommitPositionFromZK();
         }
     }
 
-    Future<DLSN> getLastCommitPositionFromZK() {
-        final Promise<DLSN> result = new Promise<DLSN>();
+    CompletableFuture<DLSN> getLastCommitPositionFromZK() {
+        final CompletableFuture<DLSN> result = new CompletableFuture<DLSN>();
         try {
             logger.debug("Reading last commit position from path {}", zkPath);
             zooKeeperClient.get().getData(zkPath, false, new 
AsyncCallback.DataCallback() {
@@ -77,25 +72,25 @@ public class ZKSubscriptionStateStore implements 
SubscriptionStateStore {
                 public void processResult(int rc, String path, Object ctx, 
byte[] data, Stat stat) {
                     logger.debug("Read last commit position from path {}: rc = 
{}", zkPath, rc);
                     if (KeeperException.Code.NONODE.intValue() == rc) {
-                        result.setValue(DLSN.NonInclusiveLowerBound);
+                        result.complete(DLSN.NonInclusiveLowerBound);
                     } else if (KeeperException.Code.OK.intValue() != rc) {
-                        
result.setException(KeeperException.create(KeeperException.Code.get(rc), path));
+                        
result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc),
 path));
                     } else {
                         try {
                             DLSN dlsn = DLSN.deserialize(new String(data, 
Charsets.UTF_8));
-                            result.setValue(dlsn);
+                            result.complete(dlsn);
                         } catch (Exception t) {
                             logger.warn("Invalid last commit position found 
from path {}", zkPath, t);
                             // invalid dlsn recorded in subscription state 
store
-                            result.setValue(DLSN.NonInclusiveLowerBound);
+                            result.complete(DLSN.NonInclusiveLowerBound);
                         }
                     }
                 }
             }, null);
         } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) {
-            result.setException(zkce);
+            result.completeExceptionally(zkce);
         } catch (InterruptedException ie) {
-            result.setException(new 
DLInterruptedException("getLastCommitPosition was interrupted", ie));
+            result.completeExceptionally(new 
DLInterruptedException("getLastCommitPosition was interrupted", ie));
         }
         return result;
     }
@@ -106,7 +101,7 @@ public class ZKSubscriptionStateStore implements 
SubscriptionStateStore {
      * @param newPosition - new commit position
      */
     @Override
-    public Future<BoxedUnit> advanceCommitPosition(DLSN newPosition) {
+    public CompletableFuture<Void> advanceCommitPosition(DLSN newPosition) {
         if (null == lastCommittedPosition.get() ||
             (newPosition.compareTo(lastCommittedPosition.get()) > 0)) {
             lastCommittedPosition.set(newPosition);
@@ -115,7 +110,7 @@ public class ZKSubscriptionStateStore implements 
SubscriptionStateStore {
                 zooKeeperClient.getDefaultACL(),
                 CreateMode.PERSISTENT);
         } else {
-            return Future.Done();
+            return FutureUtils.Void();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionsStore.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionsStore.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionsStore.java
index d75f5fc..0392264 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionsStore.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionsStore.java
@@ -17,30 +17,26 @@
  */
 package org.apache.distributedlog.impl.subscription;
 
-import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.ZooKeeperClient;
-import org.apache.distributedlog.exceptions.DLInterruptedException;
-import org.apache.distributedlog.subscription.SubscriptionStateStore;
-import org.apache.distributedlog.subscription.SubscriptionsStore;
-import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-
-import org.apache.bookkeeper.meta.ZkVersion;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import org.apache.bookkeeper.meta.ZkVersion;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.ZooKeeperClient;
+import org.apache.distributedlog.exceptions.DLInterruptedException;
+import org.apache.distributedlog.api.subscription.SubscriptionStateStore;
+import org.apache.distributedlog.api.subscription.SubscriptionsStore;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
+import org.apache.distributedlog.util.Utils;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
 
 /**
  * ZooKeeper Based Subscriptions Store.
@@ -82,72 +78,62 @@ public class ZKSubscriptionsStore implements 
SubscriptionsStore {
     }
 
     @Override
-    public Future<DLSN> getLastCommitPosition(String subscriberId) {
+    public CompletableFuture<DLSN> getLastCommitPosition(String subscriberId) {
         return getSubscriber(subscriberId).getLastCommitPosition();
     }
 
     @Override
-    public Future<Map<String, DLSN>> getLastCommitPositions() {
-        final Promise<Map<String, DLSN>> result = new Promise<Map<String, 
DLSN>>();
+    public CompletableFuture<Map<String, DLSN>> getLastCommitPositions() {
+        final CompletableFuture<Map<String, DLSN>> result = new 
CompletableFuture<Map<String, DLSN>>();
         try {
             this.zkc.get().getChildren(this.zkPath, false, new 
AsyncCallback.Children2Callback() {
                 @Override
                 public void processResult(int rc, String path, Object ctx, 
List<String> children, Stat stat) {
                     if (KeeperException.Code.NONODE.intValue() == rc) {
-                        result.setValue(new HashMap<String, DLSN>());
+                        result.complete(new HashMap<String, DLSN>());
                     } else if (KeeperException.Code.OK.intValue() != rc) {
-                        
result.setException(KeeperException.create(KeeperException.Code.get(rc), path));
+                        
result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc),
 path));
                     } else {
                         getLastCommitPositions(result, children);
                     }
                 }
             }, null);
         } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) {
-            result.setException(zkce);
+            result.completeExceptionally(zkce);
         } catch (InterruptedException ie) {
-            result.setException(new 
DLInterruptedException("getLastCommitPositions was interrupted", ie));
+            result.completeExceptionally(new 
DLInterruptedException("getLastCommitPositions was interrupted", ie));
         }
         return result;
     }
 
-    private void getLastCommitPositions(final Promise<Map<String, DLSN>> 
result,
+    private void getLastCommitPositions(final CompletableFuture<Map<String, 
DLSN>> result,
                                         List<String> subscribers) {
-        List<Future<Pair<String, DLSN>>> futures =
-                new ArrayList<Future<Pair<String, DLSN>>>(subscribers.size());
+        List<CompletableFuture<Pair<String, DLSN>>> futures =
+                new ArrayList<CompletableFuture<Pair<String, 
DLSN>>>(subscribers.size());
         for (String s : subscribers) {
             final String subscriber = s;
-            Future<Pair<String, DLSN>> future =
+            CompletableFuture<Pair<String, DLSN>> future =
                 // Get the last commit position from zookeeper
-                getSubscriber(subscriber).getLastCommitPositionFromZK().map(
-                        new AbstractFunction1<DLSN, Pair<String, DLSN>>() {
-                            @Override
-                            public Pair<String, DLSN> apply(DLSN dlsn) {
-                                return Pair.of(subscriber, dlsn);
-                            }
-                        });
+                
getSubscriber(subscriber).getLastCommitPositionFromZK().thenApply(
+                    dlsn -> Pair.of(subscriber, dlsn));
             futures.add(future);
         }
-        Future.collect(futures).foreach(
-            new AbstractFunction1<List<Pair<String, DLSN>>, BoxedUnit>() {
-                @Override
-                public BoxedUnit apply(List<Pair<String, DLSN>> subscriptions) 
{
-                    Map<String, DLSN> subscriptionMap = new HashMap<String, 
DLSN>();
-                    for (Pair<String, DLSN> pair : subscriptions) {
-                        subscriptionMap.put(pair.getLeft(), pair.getRight());
-                    }
-                    result.setValue(subscriptionMap);
-                    return BoxedUnit.UNIT;
-                }
-            });
+        FutureUtils.collect(futures).thenAccept((subscriptions) -> {
+            Map<String, DLSN> subscriptionMap = new HashMap<String, DLSN>();
+            for (Pair<String, DLSN> pair : subscriptions) {
+                subscriptionMap.put(pair.getLeft(), pair.getRight());
+            }
+            result.complete(subscriptionMap);
+        });
     }
 
     @Override
-    public Future<BoxedUnit> advanceCommitPosition(String subscriberId, DLSN 
newPosition) {
+    public CompletableFuture<Void> advanceCommitPosition(String subscriberId, 
DLSN newPosition) {
         return getSubscriber(subscriberId).advanceCommitPosition(newPosition);
     }
 
     @Override
-    public Future<Boolean> deleteSubscriber(String subscriberId) {
+    public CompletableFuture<Boolean> deleteSubscriber(String subscriberId) {
         subscribers.remove(subscriberId);
         String path = getSubscriberZKPath(subscriberId);
         return Utils.zkDeleteIfNotExist(zkc, path, new ZkVersion(-1));

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/io/Abortable.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/io/Abortable.java 
b/distributedlog-core/src/main/java/org/apache/distributedlog/io/Abortable.java
deleted file mode 100644
index b2b430d..0000000
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/io/Abortable.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.io;
-
-import java.io.IOException;
-
-/**
- * An {@code Abortable} is a source or destination of data that can be aborted.
- * The abort method is invoked to release resources that the object is holding
- * (such as open files). The abort happens when the object is in an error 
state,
- * which it couldn't be closed gracefully.
- *
- * @see java.io.Closeable
- * @since 0.3.32
- */
-public interface Abortable {
-
-    /**
-     * Aborts the object and releases any resources associated with it.
-     * If the object is already aborted then invoking this method has no
-     * effect.
-     *
-     * @throws IOException if an I/O error occurs.
-     */
-    public void abort() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/io/Abortables.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/io/Abortables.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/io/Abortables.java
deleted file mode 100644
index a4838b1..0000000
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/io/Abortables.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.io;
-
-import com.google.common.collect.Lists;
-import org.apache.distributedlog.function.VoidFunctions;
-import org.apache.distributedlog.util.FutureUtils;
-import com.twitter.util.Future;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-
-/**
- * Utility methods for working with {@link Abortable} objects.
- *
- * @since 0.3.32
- */
-public final class Abortables {
-
-    static final Logger logger = LoggerFactory.getLogger(Abortables.class);
-
-    private Abortables() {}
-
-    public static Future<Void> asyncAbort(@Nullable AsyncAbortable abortable,
-                                          boolean swallowIOException) {
-        if (null == abortable) {
-            return Future.Void();
-        } else if (swallowIOException) {
-            return FutureUtils.ignore(abortable.asyncAbort());
-        } else {
-            return abortable.asyncAbort();
-        }
-    }
-
-    /**
-     * Aborts a {@link Abortable}, with control over whether an {@link 
IOException} may be thrown.
-     * This is primarily useful in a finally block, where a thrown exception 
needs to be logged but
-     * not propagated (otherwise the original exception will be lost).
-     *
-     * <p>If {@code swallowIOException} is true then we never throw {@code 
IOException} but merely log it.
-     *
-     * <p>Example: <pre>   {@code
-     *
-     *   public void abortStreamNicely() throws IOException {
-     *      SomeStream stream = new SomeStream("foo");
-     *      try {
-     *          // ... code which does something with the stream ...
-     *      } catch (IOException ioe) {
-     *          // If an exception occurs, we might abort the stream.
-     *          Abortables.abort(stream, true);
-     *      }
-     *   }}</pre>
-     *
-     * @param abortable the {@code Abortable} object to be aborted, or null, 
in which case this method
-     *                  does nothing.
-     * @param swallowIOException if true, don't propagate IO exceptions thrown 
by the {@code abort} methods
-     * @throws IOException if {@code swallowIOException} is false and {@code 
abort} throws an {@code IOException}
-     */
-    public static void abort(@Nullable Abortable abortable,
-                             boolean swallowIOException)
-        throws IOException {
-        if (null == abortable) {
-            return;
-        }
-        try {
-            abortable.abort();
-        } catch (IOException ioe) {
-            if (swallowIOException) {
-                logger.warn("IOException thrown while aborting Abortable {} : 
", abortable, ioe);
-            } else {
-                throw ioe;
-            }
-        }
-    }
-
-    /**
-     * Abort async <i>abortable</i>
-     *
-     * @param abortable the {@code AsyncAbortable} object to be aborted, or 
null, in which case this method
-     *                  does nothing.
-     * @param swallowIOException if true, don't propagate IO exceptions thrown 
by the {@code abort} methods
-     * @throws IOException if {@code swallowIOException} is false and {@code 
abort} throws an {@code IOException}
-     * @see #abort(Abortable, boolean)
-     */
-    public static void abort(@Nullable AsyncAbortable abortable,
-                             boolean swallowIOException)
-            throws IOException {
-        if (null == abortable) {
-            return;
-        }
-        try {
-            FutureUtils.result(abortable.asyncAbort());
-        } catch (IOException ioe) {
-            if (swallowIOException) {
-                logger.warn("IOException thrown while aborting Abortable {} : 
", abortable, ioe);
-            } else {
-                throw ioe;
-            }
-        }
-    }
-
-    /**
-     * Aborts the given {@code abortable}, logging any {@code IOException} 
that's thrown rather than
-     * propagating it.
-     *
-     * While it's not safe in the general case to ignore exceptions that are 
thrown when aborting an
-     * I/O resource, it should generally be safe in the case of a resource 
that's being used only for
-     * reading.
-     *
-     * @param abortable the {@code Abortable} to be closed, or {@code null} in 
which case this method
-     *                  does nothing.
-     */
-    public static void abortQuietly(@Nullable Abortable abortable) {
-        try {
-            abort(abortable, true);
-        } catch (IOException e) {
-            logger.error("Unexpected IOException thrown while aborting 
Abortable {} quietly : ", abortable, e);
-        }
-    }
-
-    /**
-     * Aborts the given {@code abortable}, logging any {@code IOException} 
that's thrown rather than
-     * propagating it.
-     *
-     * While it's not safe in the general case to ignore exceptions that are 
thrown when aborting an
-     * I/O resource, it should generally be safe in the case of a resource 
that's being used only for
-     * reading.
-     *
-     * @param abortable the {@code AsyncAbortable} to be closed, or {@code 
null} in which case this method
-     *                  does nothing.
-     */
-    public static void abortQuietly(@Nullable AsyncAbortable abortable) {
-        try {
-            abort(abortable, true);
-        } catch (IOException e) {
-            logger.error("Unexpected IOException thrown while aborting 
Abortable {} quietly : ", abortable, e);
-        }
-    }
-
-    /**
-     * Abort the abortables in sequence.
-     *
-     * @param executorService
-     *          executor service to execute
-     * @param abortables
-     *          abortables to abort
-     * @return future represents the abort future
-     */
-    public static Future<Void> abortSequence(ExecutorService executorService,
-                                             AsyncAbortable... abortables) {
-        List<AsyncAbortable> abortableList = 
Lists.newArrayListWithExpectedSize(abortables.length);
-        for (AsyncAbortable abortable : abortables) {
-            if (null == abortable) {
-                abortableList.add(AsyncAbortable.NULL);
-            } else {
-                abortableList.add(abortable);
-            }
-        }
-        return FutureUtils.processList(
-                abortableList,
-                AsyncAbortable.ABORT_FUNC,
-                executorService).map(VoidFunctions.LIST_TO_VOID_FUNC);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncAbortable.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncAbortable.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncAbortable.java
deleted file mode 100644
index 7ec26a1..0000000
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncAbortable.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.io;
-
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-
-/**
- * An {@code Abortable} is a source or destination of data that can be aborted.
- * The abort method is invoked to release resources that the object is holding
- * (such as open files). The abort happens when the object is in an error 
state,
- * which it couldn't be closed gracefully.
- *
- * @see AsyncCloseable
- * @see Abortable
- * @since 0.3.43
- */
-public interface AsyncAbortable {
-
-    Function<AsyncAbortable, Future<Void>> ABORT_FUNC = new 
Function<AsyncAbortable, Future<Void>>() {
-        @Override
-        public Future<Void> apply(AsyncAbortable abortable) {
-            return abortable.asyncAbort();
-        }
-    };
-
-    AsyncAbortable NULL = new AsyncAbortable() {
-        @Override
-        public Future<Void> asyncAbort() {
-            return Future.Void();
-        }
-    };
-
-    /**
-     * Aborts the object and releases any resources associated with it.
-     * If the object is already aborted then invoking this method has no
-     * effect.
-     *
-     * @return future represents the abort result
-     */
-    Future<Void> asyncAbort();
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncCloseable.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncCloseable.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncCloseable.java
deleted file mode 100644
index 2bf0119..0000000
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncCloseable.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.io;
-
-import org.apache.distributedlog.util.FutureUtils;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-
-/**
- * A {@code AsyncCloseable} is a source or destination of data that can be 
closed asynchronously.
- * The close method is invoked to release resources that the object is
- * holding (such as open files).
- */
-public interface AsyncCloseable {
-
-    Function<AsyncCloseable, Future<Void>> CLOSE_FUNC = new 
Function<AsyncCloseable, Future<Void>>() {
-        @Override
-        public Future<Void> apply(AsyncCloseable closeable) {
-            return closeable.asyncClose();
-        }
-    };
-
-    Function<AsyncCloseable, Future<Void>> CLOSE_FUNC_IGNORE_ERRORS = new 
Function<AsyncCloseable, Future<Void>>() {
-        @Override
-        public Future<Void> apply(AsyncCloseable closeable) {
-            return FutureUtils.ignore(closeable.asyncClose());
-        }
-    };
-
-    AsyncCloseable NULL = new AsyncCloseable() {
-        @Override
-        public Future<Void> asyncClose() {
-            return Future.Void();
-        }
-    };
-
-    /**
-     * Closes this source and releases any system resources associated
-     * with it. If the source is already closed then invoking this
-     * method has no effect.
-     *
-     * @return future representing the close result.
-     */
-    Future<Void> asyncClose();
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncDeleteable.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncDeleteable.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncDeleteable.java
deleted file mode 100644
index 046c731..0000000
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncDeleteable.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.io;
-
-import com.twitter.util.Future;
-
-/**
- * A {@code AsyncDeleteable} is a source or destination of data that can be 
deleted asynchronously.
- * This delete method is invoked to delete the source.
- */
-public interface AsyncDeleteable {
-    /**
-     * Releases any system resources associated with this and delete the 
source. If the source is
-     * already deleted then invoking this method has no effect.
-     *
-     * @return future representing the deletion result.
-     */
-    Future<Void> delete();
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/io/package-info.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/io/package-info.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/io/package-info.java
deleted file mode 100644
index eb81cfe..0000000
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/io/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * IO Utils for distributedlog
- */
-package org.apache.distributedlog.io;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/limiter/ComposableRequestLimiter.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/limiter/ComposableRequestLimiter.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/limiter/ComposableRequestLimiter.java
index 95165ef..ae01bf7 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/limiter/ComposableRequestLimiter.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/limiter/ComposableRequestLimiter.java
@@ -17,16 +17,10 @@
  */
 package org.apache.distributedlog.limiter;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-
-import org.apache.distributedlog.exceptions.OverCapacityException;
-import org.apache.distributedlog.limiter.GuavaRateLimiter;
-import org.apache.distributedlog.limiter.RateLimiter;
-
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.StatsLogger;
-
+import org.apache.distributedlog.exceptions.OverCapacityException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/lock/DistributedLock.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/DistributedLock.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/DistributedLock.java
index 986678c..156d6dd 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/DistributedLock.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/DistributedLock.java
@@ -17,9 +17,9 @@
  */
 package org.apache.distributedlog.lock;
 
+import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.exceptions.LockingException;
 import org.apache.distributedlog.io.AsyncCloseable;
-import com.twitter.util.Future;
 
 /**
  * Interface for distributed locking
@@ -31,7 +31,7 @@ public interface DistributedLock extends AsyncCloseable {
      *
      * @return future represents the acquire result.
      */
-    Future<? extends DistributedLock> asyncAcquire();
+    CompletableFuture<? extends DistributedLock> asyncAcquire();
 
     /**
      * Check if hold lock. If it doesn't, then re-acquire the lock.

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/lock/LockWaiter.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/LockWaiter.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/LockWaiter.java
index b70098e..1cb3364 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/LockWaiter.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/LockWaiter.java
@@ -17,10 +17,13 @@
  */
 package org.apache.distributedlog.lock;
 
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import com.twitter.util.Future;
-import com.twitter.util.Timer;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.distributedlog.exceptions.DLInterruptedException;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
+import org.apache.distributedlog.util.OrderedScheduler;
+import org.apache.distributedlog.util.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,11 +36,11 @@ public class LockWaiter {
 
     private final String lockId;
     private final String currentOwner;
-    private final Future<Boolean> acquireFuture;
+    private final CompletableFuture<Boolean> acquireFuture;
 
     public LockWaiter(String lockId,
                       String currentOwner,
-                      Future<Boolean> acquireFuture) {
+                      CompletableFuture<Boolean> acquireFuture) {
         this.lockId = lockId;
         this.currentOwner = currentOwner;
         this.acquireFuture = acquireFuture;
@@ -64,12 +67,13 @@ public class LockWaiter {
     /**
      * Return the future representing the waiting result.
      *
-     * <p>If the future is interrupted (e.g. {@link Future#within(Duration, 
Timer)}),
+     * <p>If the future is interrupted
+     * (e.g. {@link FutureUtils#within(CompletableFuture, long, TimeUnit, 
Throwable, OrderedScheduler, Object)}),
      * the waiter will automatically clean up its waiting state.
      *
      * @return the future representing the acquire result.
      */
-    public Future<Boolean> getAcquireFuture() {
+    public CompletableFuture<Boolean> getAcquireFuture() {
         return acquireFuture;
     }
 
@@ -81,12 +85,12 @@ public class LockWaiter {
     public boolean waitForAcquireQuietly() {
         boolean success = false;
         try {
-            success = Await.result(acquireFuture);
-        } catch (InterruptedException ie) {
+            success = Utils.ioResult(acquireFuture);
+        } catch (DLInterruptedException ie) {
             Thread.currentThread().interrupt();
         } catch (LockTimeoutException lte) {
             logger.debug("Timeout on lock acquiring", lte);
-        } catch (Exception e) {
+        } catch (IOException e) {
             logger.error("Caught exception waiting for lock acquired", e);
         }
         return success;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/lock/NopDistributedLock.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/NopDistributedLock.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/NopDistributedLock.java
index 88abffa..7f770ad 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/NopDistributedLock.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/NopDistributedLock.java
@@ -17,8 +17,9 @@
  */
 package org.apache.distributedlog.lock;
 
+import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.exceptions.LockingException;
-import com.twitter.util.Future;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 
 /**
  * An implementation of {@link DistributedLock} which does nothing.
@@ -30,8 +31,8 @@ public class NopDistributedLock implements DistributedLock {
     private NopDistributedLock() {}
 
     @Override
-    public Future<? extends DistributedLock> asyncAcquire() {
-        return Future.value(this);
+    public CompletableFuture<? extends DistributedLock> asyncAcquire() {
+        return FutureUtils.value(this);
     }
 
     @Override
@@ -45,7 +46,7 @@ public class NopDistributedLock implements DistributedLock {
     }
 
     @Override
-    public Future<Void> asyncClose() {
-        return Future.Void();
+    public CompletableFuture<Void> asyncClose() {
+        return FutureUtils.Void();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/lock/SessionLock.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/SessionLock.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/SessionLock.java
index 8aec2c0..3a46a13 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/SessionLock.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/SessionLock.java
@@ -17,12 +17,10 @@
  */
 package org.apache.distributedlog.lock;
 
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import org.apache.distributedlog.exceptions.LockingException;
 import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
-import com.twitter.util.Future;
-import scala.runtime.BoxedUnit;
-
-import java.util.concurrent.TimeUnit;
 
 /**
  * One time lock.
@@ -71,7 +69,7 @@ public interface SessionLock {
      * <i>tryLock</i> here is effectively the combination of following 
asynchronous calls.
      * <pre>
      *     ZKDistributedLock lock = ...;
-     *     Future<LockWaiter> attemptFuture = lock.asyncTryLock(...);
+     *     CompletableFuture<LockWaiter> attemptFuture = 
lock.asyncTryLock(...);
      *
      *     boolean acquired = waiter.waitForAcquireQuietly();
      *     if (acquired) {
@@ -106,7 +104,7 @@ public interface SessionLock {
      * @return lock waiter representing this attempt of acquiring lock.
      * @see #tryLock(long, TimeUnit)
      */
-    Future<LockWaiter> asyncTryLock(long timeout, TimeUnit unit);
+    CompletableFuture<LockWaiter> asyncTryLock(long timeout, TimeUnit unit);
 
     /**
      * Release a claimed lock.
@@ -121,6 +119,6 @@ public interface SessionLock {
      * @return future representing the result of unlock operation.
      * @see #unlock()
      */
-    Future<BoxedUnit> asyncUnlock();
+    CompletableFuture<Void> asyncUnlock();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/lock/SessionLockFactory.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/SessionLockFactory.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/SessionLockFactory.java
index a68f2d8..9d3159e 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/SessionLockFactory.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/SessionLockFactory.java
@@ -17,7 +17,7 @@
  */
 package org.apache.distributedlog.lock;
 
-import com.twitter.util.Future;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * Factory to create {@link SessionLock}
@@ -33,6 +33,6 @@ public interface SessionLockFactory {
      *          lock context
      * @return future represents the creation result.
      */
-    Future<SessionLock> createLock(String lockPath, DistributedLockContext 
context);
+    CompletableFuture<SessionLock> createLock(String lockPath, 
DistributedLockContext context);
 
 }

Reply via email to