Github user jiazhai commented on a diff in the pull request:
https://github.com/apache/incubator-distributedlog/pull/133#discussion_r119984594
--- Diff:
distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
---
@@ -525,75 +495,63 @@ public BKSyncLogWriter
startLogSegmentNonPartitioned() throws IOException {
*/
@Override
public BKAsyncLogWriter startAsyncLogSegmentNonPartitioned() throws
IOException {
- return (BKAsyncLogWriter) FutureUtils.result(openAsyncLogWriter());
+ return (BKAsyncLogWriter) Utils.ioResult(openAsyncLogWriter());
}
@Override
- public Future<AsyncLogWriter> openAsyncLogWriter() {
+ public CompletableFuture<AsyncLogWriter> openAsyncLogWriter() {
try {
checkClosedOrInError("startLogSegmentNonPartitioned");
} catch (AlreadyClosedException e) {
- return Future.exception(e);
+ return FutureUtils.exception(e);
}
- Future<BKLogWriteHandler> createWriteHandleFuture;
+ CompletableFuture<BKLogWriteHandler> createWriteHandleFuture;
synchronized (this) {
// 1. create the locked write handler
createWriteHandleFuture = asyncCreateWriteHandler(true);
}
- return createWriteHandleFuture.flatMap(new
AbstractFunction1<BKLogWriteHandler, Future<AsyncLogWriter>>() {
- @Override
- public Future<AsyncLogWriter> apply(final BKLogWriteHandler
writeHandler) {
- final BKAsyncLogWriter writer;
- synchronized (BKDistributedLogManager.this) {
- // 2. create the writer with the handler
- writer = new BKAsyncLogWriter(
- conf,
- dynConf,
- BKDistributedLogManager.this,
- writeHandler,
- featureProvider,
- statsLogger);
- }
- // 3. recover the incomplete log segments
- return writeHandler.recoverIncompleteLogSegments()
- .map(new AbstractFunction1<Long, AsyncLogWriter>()
{
- @Override
- public AsyncLogWriter apply(Long lastTxId) {
- // 4. update last tx id if successfully
recovered
- writer.setLastTxId(lastTxId);
- return writer;
- }
- }).onFailure(new AbstractFunction1<Throwable,
BoxedUnit>() {
- @Override
- public BoxedUnit apply(Throwable cause) {
- // 5. close the writer if recovery failed
- writer.asyncAbort();
- return BoxedUnit.UNIT;
- }
- });
+ return createWriteHandleFuture.thenCompose(writeHandler -> {
+ final BKAsyncLogWriter writer;
+ synchronized (BKDistributedLogManager.this) {
+ // 2. create the writer with the handler
+ writer = new BKAsyncLogWriter(
+ conf,
+ dynConf,
+ BKDistributedLogManager.this,
+ writeHandler,
+ featureProvider,
+ statsLogger);
}
+ // 3. recover the incomplete log segments
+ return writeHandler.recoverIncompleteLogSegments()
+ .thenApply(lastTxId -> {
+ // 4. update last tx id if successfully recovered
+ writer.setLastTxId(lastTxId);
+ return (AsyncLogWriter) writer;
+ })
+ .whenComplete((lastTxId, cause) -> {
+ if (null != cause) {
+ // 5. close the writer if recovery failed
+ writer.asyncAbort();
+ }
+ });
});
}
@Override
- public Future<DLSN> getDLSNNotLessThanTxId(final long fromTxnId) {
- return getLogSegmentsAsync().flatMap(new
AbstractFunction1<List<LogSegmentMetadata>, Future<DLSN>>() {
- @Override
- public Future<DLSN> apply(List<LogSegmentMetadata> segments) {
- return getDLSNNotLessThanTxId(fromTxnId, segments);
- }
- });
+ public CompletableFuture<DLSN> getDLSNNotLessThanTxId(final long
fromTxnId) {
+ return getLogSegmentsAsync().thenCompose(segments ->
getDLSNNotLessThanTxId(fromTxnId, segments));
}
- private Future<DLSN> getDLSNNotLessThanTxId(long fromTxnId,
+ private CompletableFuture<DLSN> getDLSNNotLessThanTxId(long fromTxnId,
final
List<LogSegmentMetadata> segments) {
--- End diff --
Please also do the code alignment change in this file, also at line 565,
707, 838.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---