eolivelli closed pull request #1572: Use readExplicitLAC instead of readEntry 
in order to get current LastAddConfirmed
URL: https://github.com/apache/bookkeeper/pull/1572
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java
index ac56a1fbfb..71c972e7f1 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java
@@ -48,6 +48,7 @@
     final long timeoutMonitorIntervalSec;
     final boolean enableBookieFailureTracking;
     final boolean useV2WireProtocol;
+    final boolean useExplicitLacForReads;
 
     static ClientInternalConf defaultValues() {
         return fromConfig(new ClientConfiguration());
@@ -79,6 +80,7 @@ private ClientInternalConf(ClientConfiguration conf,
         this.maxAllowedEnsembleChanges = conf.getMaxAllowedEnsembleChanges();
         this.timeoutMonitorIntervalSec = conf.getTimeoutMonitorIntervalSec();
         this.enableBookieFailureTracking = 
conf.getEnableBookieFailureTracking();
+        this.useExplicitLacForReads = conf.isUseExplicitLacForReads();
         this.useV2WireProtocol = conf.getUseV2WireProtocol();
 
         if (conf.getFirstSpeculativeReadTimeout() > 0) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 157a1b84ca..6ea9be78b2 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -1335,6 +1335,14 @@ synchronized void updateLastConfirmed(long lac, long 
len) {
      */
 
     public void asyncReadLastConfirmed(final ReadLastConfirmedCallback cb, 
final Object ctx) {
+        if (clientCtx.getConf().useExplicitLacForReads) {
+            asyncReadExplicitLastConfirmed(cb, ctx);
+        } else {
+            asyncReadPiggybackLastConfirmed(cb, ctx);
+        }
+    }
+
+    private void asyncReadPiggybackLastConfirmed(final 
ReadLastConfirmedCallback cb, final Object ctx) {
         boolean isClosed;
         long lastEntryId;
         synchronized (this) {
@@ -1377,6 +1385,14 @@ public void readLastConfirmedDataComplete(int rc, 
DigestManager.RecoveryData dat
      *          callback context
      */
     public void asyncTryReadLastConfirmed(final ReadLastConfirmedCallback cb, 
final Object ctx) {
+        if (clientCtx.getConf().useExplicitLacForReads) {
+            asyncTryReadExplicitLastConfirmed(cb, ctx);
+        } else {
+            asyncTryReadPiggybackLastConfirmed(cb, ctx);
+        }
+    }
+
+    private void asyncTryReadPiggybackLastConfirmed(final 
ReadLastConfirmedCallback cb, final Object ctx) {
         boolean isClosed;
         long lastEntryId;
         synchronized (this) {
@@ -1661,6 +1677,37 @@ public void getLacComplete(int rc, long lac) {
         new PendingReadLacOp(this, clientCtx.getBookieClient(), 
getCurrentEnsemble(), innercb).initiate();
     }
 
+    void asyncTryReadExplicitLastConfirmed(final ReadLastConfirmedCallback cb, 
final Object ctx) {
+        boolean isClosed;
+        synchronized (this) {
+            LedgerMetadata metadata = getLedgerMetadata();
+            isClosed = metadata.isClosed();
+            if (isClosed) {
+                lastAddConfirmed = metadata.getLastEntryId();
+                length = metadata.getLength();
+            }
+        }
+        if (isClosed) {
+            cb.readLastConfirmedComplete(BKException.Code.OK, 
lastAddConfirmed, ctx);
+            return;
+        }
+
+        TryPendingReadLacOp.LacCallback innercb = new 
TryPendingReadLacOp.LacCallback() {
+
+            @Override
+            public void getLacComplete(int rc, long lac) {
+                if (rc == BKException.Code.OK) {
+                    // here we are trying to update lac only but not length
+                    updateLastConfirmed(lac, 0);
+                    cb.readLastConfirmedComplete(rc, lac, ctx);
+                } else {
+                    cb.readLastConfirmedComplete(rc, INVALID_ENTRY_ID, ctx);
+                }
+            }
+        };
+        new TryPendingReadLacOp(this, clientCtx.getBookieClient(), 
getCurrentEnsemble(), innercb).initiate();
+    }
+
     /*
      * Obtains synchronously the explicit last add confirmed from a quorum of
      * bookies. This call obtains Explicit LAC value and piggy-backed LAC 
value (just like
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryPendingReadLacOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryPendingReadLacOp.java
new file mode 100644
index 0000000000..40d2601323
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryPendingReadLacOp.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import io.netty.buffer.ByteBuf;
+import java.util.List;
+
+import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
+import org.apache.bookkeeper.proto.checksum.DigestManager.RecoveryData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This op is try to read last confirmed without involving quorum coverage 
checking.
+ * Use {@link PendingReadLacOp} if you need quorum coverage checking.
+ */
+
+class TryPendingReadLacOp implements ReadLacCallback {
+    static final Logger LOG = 
LoggerFactory.getLogger(TryPendingReadLacOp.class);
+    LedgerHandle lh;
+    LacCallback cb;
+    int numResponsesPending;
+    volatile boolean completed = false;
+    volatile boolean hasValidResponse = false;
+    int lastSeenError = BKException.Code.ReadException;
+    RecoveryData maxRecoveredData;
+    long maxLac;
+    final List<BookieSocketAddress> currentEnsemble;
+    final BookieClient bookieClient;
+
+    /*
+     * Wrapper to get Lac from the request
+     */
+    interface LacCallback {
+        void getLacComplete(int rc, long lac);
+    }
+
+    TryPendingReadLacOp(LedgerHandle lh, BookieClient bookieClient,
+                        List<BookieSocketAddress> ensemble, LacCallback cb) {
+        this.lh = lh;
+        this.cb = cb;
+        this.maxLac = lh.getLastAddConfirmed();
+        this.numResponsesPending = lh.getLedgerMetadata().getEnsembleSize();
+        this.bookieClient = bookieClient;
+        this.maxRecoveredData = new RecoveryData(maxLac, 0);
+        this.currentEnsemble = ensemble;
+    }
+
+    public void initiate() {
+        for (int i = 0; i < currentEnsemble.size(); i++) {
+            bookieClient.readLac(currentEnsemble.get(i),
+                    lh.ledgerId, this, i);
+        }
+    }
+
+    @Override
+    public void readLacComplete(int rc, long ledgerId, final ByteBuf 
lacBuffer, final ByteBuf lastEntryBuffer,
+            Object ctx) {
+        int bookieIndex = (Integer) ctx;
+
+        numResponsesPending--;
+
+        if (completed) {
+            return;
+        }
+
+
+        if (rc == BKException.Code.OK) {
+            try {
+                // Each bookie may have two store LAC in two places.
+                // One is in-memory copy in FileInfo and other is
+                // piggy-backed LAC on the last entry.
+                // This routine picks both of them and compares to return
+                // the latest Lac.
+
+                // lacBuffer and lastEntryBuffer are optional in the protocol.
+                // So check if they exist before processing them.
+                long newLac = LedgerHandle.INVALID_ENTRY_ID;
+                // Extract lac from FileInfo on the ledger.
+                if (lacBuffer != null && lacBuffer.readableBytes() > 0) {
+                    long lac = 
lh.macManager.verifyDigestAndReturnLac(lacBuffer);
+                    if (lac > maxLac) {
+                        newLac = lac;
+                    }
+                }
+                // Extract lac from last entry on the disk
+                if (lastEntryBuffer != null && lastEntryBuffer.readableBytes() 
> 0) {
+                    RecoveryData recoveryData = 
lh.macManager.verifyDigestAndReturnLastConfirmed(lastEntryBuffer);
+                    long piggyBackedLAC = recoveryData.getLastAddConfirmed();
+                    if (piggyBackedLAC > newLac) {
+                        newLac = piggyBackedLAC;
+                    }
+                }
+                if (newLac > maxLac) {
+                    // as for TryReadLastConfirmedOp we will call the callback 
as soon as possible
+                    cb.getLacComplete(rc, newLac);
+                    completed = true;
+                }
+                maxLac = newLac;
+
+                hasValidResponse = true;
+            } catch (BKDigestMatchException e) {
+                // Too bad, this bookie did not give us a valid answer, we
+                // still might be able to recover. So, continue
+                LOG.error("Mac mismatch while reading  ledger: " + ledgerId + 
" LAC from bookie: "
+                        + currentEnsemble.get(bookieIndex));
+                rc = BKException.Code.DigestMatchException;
+            }
+        }
+
+        if (rc == BKException.Code.NoSuchLedgerExistsException || rc == 
BKException.Code.NoSuchEntryException) {
+            hasValidResponse = true;
+        }
+
+        if (rc == BKException.Code.UnauthorizedAccessException && !completed) {
+            cb.getLacComplete(rc, maxLac);
+            completed = true;
+            return;
+        }
+
+        if (!hasValidResponse && BKException.Code.OK != rc) {
+            lastSeenError = rc;
+        }
+
+        if (numResponsesPending == 0 && !completed) {
+            if (!hasValidResponse) {
+                cb.getLacComplete(lastSeenError, maxLac);
+            } else {
+                cb.getLacComplete(BKException.Code.OK, maxLac);
+            }
+            completed = true;
+        }
+    }
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 7d0d319882..4e273bd106 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -121,6 +121,7 @@
     protected static final String TIMEOUT_MONITOR_INTERVAL_SEC = 
"timeoutMonitorIntervalSec";
     protected static final String TIMEOUT_TASK_INTERVAL_MILLIS = 
"timeoutTaskIntervalMillis";
     protected static final String EXPLICIT_LAC_INTERVAL = 
"explicitLacInterval";
+    protected static final String USE_EXPLICIT_LAC_FOR_READS = 
"useExplicitLacForReads";
     protected static final String PCBC_TIMEOUT_TIMER_TICK_DURATION_MS = 
"pcbcTimeoutTimerTickDurationMs";
     protected static final String PCBC_TIMEOUT_TIMER_NUM_TICKS = 
"pcbcTimeoutTimerNumTicks";
     protected static final String TIMEOUT_TIMER_TICK_DURATION_MS = 
"timeoutTimerTickDurationMs";
@@ -733,6 +734,28 @@ public ClientConfiguration setExplictLacInterval(int 
interval) {
         return this;
     }
 
+    /**
+     * Whether to enable reads of LastAddConfirmed using ExplicitLAC support.
+     *
+     * @return true if enable reads of LastAddConfirmed using ExplicitLAC 
support. otherwise, return false.
+     */
+    public boolean isUseExplicitLacForReads() {
+        return getBoolean(USE_EXPLICIT_LAC_FOR_READS, false);
+    }
+
+    /**
+     * Enable/Disable reads of LastAddConfirmed using ExplicitLAC support.
+     *
+     * @param value if true the client will consider ExplicitLAC while reading 
LastAddConfirmed
+     *              from Bookies.
+     *
+     * @return Client configuration.
+     */
+    public ClientConfiguration setUseExplicitLacForReads(boolean value) {
+        setProperty(USE_EXPLICIT_LAC_FOR_READS, value);
+        return this;
+    }
+
     /**
      * Get the tick duration in milliseconds that used for the
      * HashedWheelTimer that used by PCBC to timeout
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestTryReadLastConfirmed.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestTryReadLastConfirmed.java
index a4a63fa6c8..d9e5dc5cb5 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestTryReadLastConfirmed.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestTryReadLastConfirmed.java
@@ -21,6 +21,9 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -29,22 +32,35 @@
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Test try read last confirmed.
  */
+@RunWith(Parameterized.class)
 public class TestTryReadLastConfirmed extends BookKeeperClusterTestCase {
 
     private static final Logger logger = 
LoggerFactory.getLogger(TestTryReadLastConfirmed.class);
 
     final DigestType digestType;
 
-    public TestTryReadLastConfirmed() {
+    public TestTryReadLastConfirmed(boolean useExplicitLacForReads) {
         super(6);
         this.digestType = DigestType.CRC32;
+        this.baseClientConf.setUseExplicitLacForReads(useExplicitLacForReads);
+    }
+
+    @Parameterized.Parameters
+    public static Collection<Object[]> configs() {
+        return Arrays.asList(new Object[][] {
+            { true },
+            { false }
+        });
     }
 
     @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to