[ 
https://issues.apache.org/jira/browse/DL-124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16035933#comment-16035933
 ] 

ASF GitHub Bot commented on DL-124:
-----------------------------------

Github user jiazhai commented on a diff in the pull request:

    
https://github.com/apache/incubator-distributedlog/pull/133#discussion_r119984594
  
    --- Diff: 
distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
 ---
    @@ -525,75 +495,63 @@ public BKSyncLogWriter 
startLogSegmentNonPartitioned() throws IOException {
          */
         @Override
         public BKAsyncLogWriter startAsyncLogSegmentNonPartitioned() throws 
IOException {
    -        return (BKAsyncLogWriter) FutureUtils.result(openAsyncLogWriter());
    +        return (BKAsyncLogWriter) Utils.ioResult(openAsyncLogWriter());
         }
     
         @Override
    -    public Future<AsyncLogWriter> openAsyncLogWriter() {
    +    public CompletableFuture<AsyncLogWriter> openAsyncLogWriter() {
             try {
                 checkClosedOrInError("startLogSegmentNonPartitioned");
             } catch (AlreadyClosedException e) {
    -            return Future.exception(e);
    +            return FutureUtils.exception(e);
             }
     
    -        Future<BKLogWriteHandler> createWriteHandleFuture;
    +        CompletableFuture<BKLogWriteHandler> createWriteHandleFuture;
             synchronized (this) {
                 // 1. create the locked write handler
                 createWriteHandleFuture = asyncCreateWriteHandler(true);
             }
    -        return createWriteHandleFuture.flatMap(new 
AbstractFunction1<BKLogWriteHandler, Future<AsyncLogWriter>>() {
    -            @Override
    -            public Future<AsyncLogWriter> apply(final BKLogWriteHandler 
writeHandler) {
    -                final BKAsyncLogWriter writer;
    -                synchronized (BKDistributedLogManager.this) {
    -                    // 2. create the writer with the handler
    -                    writer = new BKAsyncLogWriter(
    -                            conf,
    -                            dynConf,
    -                            BKDistributedLogManager.this,
    -                            writeHandler,
    -                            featureProvider,
    -                            statsLogger);
    -                }
    -                // 3. recover the incomplete log segments
    -                return writeHandler.recoverIncompleteLogSegments()
    -                        .map(new AbstractFunction1<Long, AsyncLogWriter>() 
{
    -                            @Override
    -                            public AsyncLogWriter apply(Long lastTxId) {
    -                                // 4. update last tx id if successfully 
recovered
    -                                writer.setLastTxId(lastTxId);
    -                                return writer;
    -                            }
    -                        }).onFailure(new AbstractFunction1<Throwable, 
BoxedUnit>() {
    -                            @Override
    -                            public BoxedUnit apply(Throwable cause) {
    -                                // 5. close the writer if recovery failed
    -                                writer.asyncAbort();
    -                                return BoxedUnit.UNIT;
    -                            }
    -                        });
    +        return createWriteHandleFuture.thenCompose(writeHandler -> {
    +            final BKAsyncLogWriter writer;
    +            synchronized (BKDistributedLogManager.this) {
    +                // 2. create the writer with the handler
    +                writer = new BKAsyncLogWriter(
    +                        conf,
    +                        dynConf,
    +                        BKDistributedLogManager.this,
    +                        writeHandler,
    +                        featureProvider,
    +                        statsLogger);
                 }
    +            // 3. recover the incomplete log segments
    +            return writeHandler.recoverIncompleteLogSegments()
    +                .thenApply(lastTxId -> {
    +                    // 4. update last tx id if successfully recovered
    +                    writer.setLastTxId(lastTxId);
    +                    return (AsyncLogWriter) writer;
    +                })
    +                .whenComplete((lastTxId, cause) -> {
    +                    if (null != cause) {
    +                        // 5. close the writer if recovery failed
    +                        writer.asyncAbort();
    +                    }
    +                });
             });
         }
     
         @Override
    -    public Future<DLSN> getDLSNNotLessThanTxId(final long fromTxnId) {
    -        return getLogSegmentsAsync().flatMap(new 
AbstractFunction1<List<LogSegmentMetadata>, Future<DLSN>>() {
    -            @Override
    -            public Future<DLSN> apply(List<LogSegmentMetadata> segments) {
    -                return getDLSNNotLessThanTxId(fromTxnId, segments);
    -            }
    -        });
    +    public CompletableFuture<DLSN> getDLSNNotLessThanTxId(final long 
fromTxnId) {
    +        return getLogSegmentsAsync().thenCompose(segments -> 
getDLSNNotLessThanTxId(fromTxnId, segments));
         }
     
    -    private Future<DLSN> getDLSNNotLessThanTxId(long fromTxnId,
    +    private CompletableFuture<DLSN> getDLSNNotLessThanTxId(long fromTxnId,
                                                     final 
List<LogSegmentMetadata> segments) {
    --- End diff --
    
    Please also do the code alignment change in this file, also at line 565, 
707, 838.


> Use Java8 Future rather than twitter Future
> -------------------------------------------
>
>                 Key: DL-124
>                 URL: https://issues.apache.org/jira/browse/DL-124
>             Project: DistributedLog
>          Issue Type: Bug
>    Affects Versions: 0.4.0
>            Reporter: Gerrit Sundaram
>            Assignee: Sijie Guo
>              Labels: help-wanted
>             Fix For: 0.5.0
>
>
> Since it is written in java, it would be good to leverage java8 future rather 
> than introducing dependencies on scala.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to