This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/distributedlog.git


The following commit(s) were added to refs/heads/master by this push:
     new 25090fc  Make distributedlog compiled with latest bookkeeper version
25090fc is described below

commit 25090fc94788948f2c6b1218ad63aa0e4db9c09b
Author: Sijie Guo <si...@apache.org>
AuthorDate: Wed Nov 29 23:30:32 2017 -0800

    Make distributedlog compiled with latest bookkeeper version
    
    Descriptions of the changes in this PR:
    
    There are code changes on PendingReadOp for new bookkeeper api in current 
master. DistributedLog uses PendingReadOp for some administration tools. So the 
current master doesn't compile with the latest bookkeeper version. This code 
change is to fix that.
    
    The change here includes:
    
    - bump bk to 4.7.0-SNAPSHOT (will switch to 4.6.0 after it is released)
    - change to use the latest CompletableFuture in latest PendingReadOp. (this 
change doesn't target at making distributedlog work with new API)
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Jia Zhai <None>
    
    This closes #240 from sijie/sijie/use_new_ledger_api
---
 .../org/apache/bookkeeper/client/LedgerReader.java | 67 ++++++++++++++--------
 pom.xml                                            |  2 +-
 2 files changed, 43 insertions(+), 26 deletions(-)

diff --git 
a/distributedlog-core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
 
b/distributedlog-core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
index ccdc52b..6c6bd4a 100644
--- 
a/distributedlog-core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
+++ 
b/distributedlog-core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
@@ -21,27 +21,27 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.Enumeration;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.client.BKException.BKNoSuchEntryException;
+import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.DistributionSchedule.WriteSet;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Reader used for DL tools to read entries.
  */
 public class LedgerReader {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(LedgerReader.class);
-
     /**
      * Read Result Holder.
      */
@@ -137,21 +137,42 @@ public class LedgerReader {
                                                     final 
GenericCallback<List<LedgerEntry>> callback) {
         final List<LedgerEntry> resultList = new ArrayList<LedgerEntry>();
 
-        final AsyncCallback.ReadCallback readCallback = new 
AsyncCallback.ReadCallback() {
+        final FutureEventListener<LedgerEntries> readListener = new 
FutureEventListener<LedgerEntries>() {
+
+            private void readNext(long entryId) {
+                PendingReadOp op = new PendingReadOp(lh, lh.bk.scheduler, 
entryId, entryId, false);
+                op.future().whenComplete(this);
+                op.submit();
+            }
+
             @Override
-            public void readComplete(int rc, LedgerHandle lh, 
Enumeration<LedgerEntry> entries, Object ctx) {
-                if (BKException.Code.NoSuchEntryException == rc) {
-                    callback.operationComplete(BKException.Code.OK, 
resultList);
-                } else if (BKException.Code.OK == rc) {
-                    while (entries.hasMoreElements()) {
-                        resultList.add(entries.nextElement());
-                    }
-                    long entryId = (Long) ctx;
-                    ++entryId;
-                    PendingReadOp readOp = new PendingReadOp(lh, 
lh.bk.scheduler, entryId, entryId, this, entryId);
-                    readOp.initiate();
+            public void onSuccess(LedgerEntries ledgerEntries) {
+                long entryId = -1L;
+                for (org.apache.bookkeeper.client.api.LedgerEntry entry : 
ledgerEntries) {
+                    resultList.add(new LedgerEntry((LedgerEntryImpl) entry));
+                    entryId = entry.getEntryId();
+                }
+                try {
+                    ledgerEntries.close();
+                } catch (Exception e) {
+                    // bk should not throw any exceptions here
+                }
+                ++entryId;
+                readNext(entryId);
+            }
+
+            @Override
+            public void onFailure(Throwable throwable) {
+                if (throwable instanceof BKNoSuchEntryException) {
+                    callback.operationComplete(Code.OK, resultList);
                 } else {
-                    callback.operationComplete(rc, resultList);
+                    int retCode;
+                    if (throwable instanceof BKException) {
+                        retCode = ((BKException) throwable).getCode();
+                    } else {
+                        retCode = Code.UnexpectedConditionException;
+                    }
+                    callback.operationComplete(retCode, resultList);
                 }
             }
         };
@@ -169,13 +190,9 @@ public class LedgerReader {
             }
 
             long entryId = recoveryData.lastAddConfirmed;
-            PendingReadOp readOp = new PendingReadOp(lh, lh.bk.scheduler, 
entryId, entryId, readCallback, entryId);
-            try {
-                readOp.initiate();
-            } catch (Throwable t) {
-                logger.error("Failed to initialize pending read entry {} for 
ledger {} : ",
-                             new Object[] { entryId, lh.getLedgerMetadata(), t 
});
-            }
+            PendingReadOp op = new PendingReadOp(lh, lh.bk.scheduler, entryId, 
entryId, false);
+            op.future().whenComplete(readListener);
+            op.submit();
         };
         // Read Last AddConfirmed
         new ReadLastConfirmedOp(lh, readLACCallback).initiate();
diff --git a/pom.xml b/pom.xml
index 616d755..738ef81 100644
--- a/pom.xml
+++ b/pom.xml
@@ -100,7 +100,7 @@
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
     <!-- dependencies -->
-    <bookkeeper.version>4.6.0-SNAPSHOT</bookkeeper.version>
+    <bookkeeper.version>4.7.0-SNAPSHOT</bookkeeper.version>
     <codahale.metrics.version>3.0.1</codahale.metrics.version>
     <commons-cli.version>1.1</commons-cli.version>
     <commons-codec.version>1.6</commons-codec.version>

-- 
To stop receiving notification emails like this one, please contact
['"distributedlog-commits@bookkeeper.apache.org" 
<distributedlog-commits@bookkeeper.apache.org>'].

Reply via email to