[
https://issues.apache.org/jira/browse/DL-124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16035933#comment-16035933
]
ASF GitHub Bot commented on DL-124:
-----------------------------------
Github user jiazhai commented on a diff in the pull request:
https://github.com/apache/incubator-distributedlog/pull/133#discussion_r119984594
--- Diff:
distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
---
@@ -525,75 +495,63 @@ public BKSyncLogWriter
startLogSegmentNonPartitioned() throws IOException {
*/
@Override
public BKAsyncLogWriter startAsyncLogSegmentNonPartitioned() throws
IOException {
- return (BKAsyncLogWriter) FutureUtils.result(openAsyncLogWriter());
+ return (BKAsyncLogWriter) Utils.ioResult(openAsyncLogWriter());
}
@Override
- public Future<AsyncLogWriter> openAsyncLogWriter() {
+ public CompletableFuture<AsyncLogWriter> openAsyncLogWriter() {
try {
checkClosedOrInError("startLogSegmentNonPartitioned");
} catch (AlreadyClosedException e) {
- return Future.exception(e);
+ return FutureUtils.exception(e);
}
- Future<BKLogWriteHandler> createWriteHandleFuture;
+ CompletableFuture<BKLogWriteHandler> createWriteHandleFuture;
synchronized (this) {
// 1. create the locked write handler
createWriteHandleFuture = asyncCreateWriteHandler(true);
}
- return createWriteHandleFuture.flatMap(new
AbstractFunction1<BKLogWriteHandler, Future<AsyncLogWriter>>() {
- @Override
- public Future<AsyncLogWriter> apply(final BKLogWriteHandler
writeHandler) {
- final BKAsyncLogWriter writer;
- synchronized (BKDistributedLogManager.this) {
- // 2. create the writer with the handler
- writer = new BKAsyncLogWriter(
- conf,
- dynConf,
- BKDistributedLogManager.this,
- writeHandler,
- featureProvider,
- statsLogger);
- }
- // 3. recover the incomplete log segments
- return writeHandler.recoverIncompleteLogSegments()
- .map(new AbstractFunction1<Long, AsyncLogWriter>()
{
- @Override
- public AsyncLogWriter apply(Long lastTxId) {
- // 4. update last tx id if successfully
recovered
- writer.setLastTxId(lastTxId);
- return writer;
- }
- }).onFailure(new AbstractFunction1<Throwable,
BoxedUnit>() {
- @Override
- public BoxedUnit apply(Throwable cause) {
- // 5. close the writer if recovery failed
- writer.asyncAbort();
- return BoxedUnit.UNIT;
- }
- });
+ return createWriteHandleFuture.thenCompose(writeHandler -> {
+ final BKAsyncLogWriter writer;
+ synchronized (BKDistributedLogManager.this) {
+ // 2. create the writer with the handler
+ writer = new BKAsyncLogWriter(
+ conf,
+ dynConf,
+ BKDistributedLogManager.this,
+ writeHandler,
+ featureProvider,
+ statsLogger);
}
+ // 3. recover the incomplete log segments
+ return writeHandler.recoverIncompleteLogSegments()
+ .thenApply(lastTxId -> {
+ // 4. update last tx id if successfully recovered
+ writer.setLastTxId(lastTxId);
+ return (AsyncLogWriter) writer;
+ })
+ .whenComplete((lastTxId, cause) -> {
+ if (null != cause) {
+ // 5. close the writer if recovery failed
+ writer.asyncAbort();
+ }
+ });
});
}
@Override
- public Future<DLSN> getDLSNNotLessThanTxId(final long fromTxnId) {
- return getLogSegmentsAsync().flatMap(new
AbstractFunction1<List<LogSegmentMetadata>, Future<DLSN>>() {
- @Override
- public Future<DLSN> apply(List<LogSegmentMetadata> segments) {
- return getDLSNNotLessThanTxId(fromTxnId, segments);
- }
- });
+ public CompletableFuture<DLSN> getDLSNNotLessThanTxId(final long
fromTxnId) {
+ return getLogSegmentsAsync().thenCompose(segments ->
getDLSNNotLessThanTxId(fromTxnId, segments));
}
- private Future<DLSN> getDLSNNotLessThanTxId(long fromTxnId,
+ private CompletableFuture<DLSN> getDLSNNotLessThanTxId(long fromTxnId,
final
List<LogSegmentMetadata> segments) {
--- End diff --
Please also do the code alignment change in this file, also at line 565,
707, 838.
> Use Java8 Future rather than twitter Future
> -------------------------------------------
>
> Key: DL-124
> URL: https://issues.apache.org/jira/browse/DL-124
> Project: DistributedLog
> Issue Type: Bug
> Affects Versions: 0.4.0
> Reporter: Gerrit Sundaram
> Assignee: Sijie Guo
> Labels: help-wanted
> Fix For: 0.5.0
>
>
> Since it is written in java, it would be good to leverage java8 future rather
> than introducing dependencies on scala.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)