This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 29257d0 If cursor update to BK fails, fallback to meta store (#1461)
29257d0 is described below
commit 29257d0747a7f044f5e46b55db973eed0e15f1f9
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Mar 28 00:23:28 2018 -0700
If cursor update to BK fails, fallback to meta store (#1461)
* If cursor update to BK fails, fallback to meta store
* Fixed missing parameters in logs
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 22 +++++++++++++++++-
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 26 +++++++++++++++++++---
.../mledger/impl/ManagedLedgerErrorsTest.java | 1 +
.../mledger/impl/NonDurableCursorTest.java | 8 ++-----
4 files changed, 47 insertions(+), 10 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 45fcf90..c2fbbab 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1820,6 +1820,7 @@ public class ManagedCursorImpl implements ManagedCursor {
new MetaStoreCallback<Void>() {
@Override
public void operationComplete(Void result, Stat stat) {
+ cursorLedgerStat = stat;
callback.operationComplete(result, stat);
}
@@ -2055,7 +2056,26 @@ public class ManagedCursorImpl implements ManagedCursor {
// If we've had a write error, the ledger will be
automatically closed, we need to create a new one,
// in the meantime the mark-delete will be queued.
STATE_UPDATER.compareAndSet(ManagedCursorImpl.this,
State.Open, State.NoLedger);
- callback.operationFailed(createManagedLedgerException(rc));
+
+ // Before giving up, try to persist the position in the
metadata store
+ persistPositionMetaStore(-1, position, mdEntry.properties, new
MetaStoreCallback<Void>() {
+ @Override
+ public void operationComplete(Void result, Stat stat) {
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "[{}][{}] Updated cursor in meta store
after previous failure in ledger at position {}",
+ ledger.getName(), name, position);
+ }
+ callback.operationComplete();
+ }
+
+ @Override
+ public void operationFailed(MetaStoreException e) {
+ log.warn("[{}][{}] Failed to update cursor in meta
store after previous failure in ledger: {}",
+ ledger.getName(), name, e.getMessage());
+
callback.operationFailed(createManagedLedgerException(rc));
+ }
+ }, true);
}
}, null);
}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 312b894..ad85a35 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -64,6 +64,7 @@ import org.apache.bookkeeper.mledger.impl.MetaStore.Stat;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.zookeeper.KeeperException.Code;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
@@ -409,11 +410,29 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
stopBookKeeper();
assertEquals(entries.size(), 1);
+ // Mark-delete should succeed if BK is down
+ cursor.markDelete(entries.get(0).getPosition());
+
+ entries.forEach(e -> e.release());
+ }
+
+ @Test(timeOut = 20000)
+ void markDeleteWithZKErrors() throws Exception {
+ ManagedLedger ledger = factory.open("my_test_ledger");
+ ManagedCursor cursor = ledger.openCursor("c1");
+ ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+ List<Entry> entries = cursor.readEntries(100);
+
+ assertEquals(entries.size(), 1);
+
+ stopBookKeeper();
+ stopZooKeeper();
+
try {
cursor.markDelete(entries.get(0).getPosition());
- fail("call should have failed");
- } catch (ManagedLedgerException e) {
- // ok
+ fail("Should have failed");
+ } catch (Exception e) {
+ // Expected
}
entries.forEach(e -> e.release());
@@ -1022,6 +1041,7 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
ManagedLedger ledger = factory.open("my_test_ledger");
bkc.failAfter(1, BKException.Code.NotEnoughBookiesException);
+ zkc.failNow(Code.SESSIONEXPIRED);
try {
ledger.openCursor("c1");
fail("should have failed");
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
index 5004d28..4efeefb 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
@@ -486,6 +486,7 @@ public class ManagedLedgerErrorsTest extends
MockedBookKeeperTestCase {
Position position = ledger.addEntry("entry".getBytes());
bkc.failNow(BKException.Code.BookieHandleNotAvailableException);
+ zkc.failNow(Code.CONNECTIONLOSS);
try {
cursor.markDelete(position);
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index afc751c..ff99da3 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -252,12 +252,8 @@ public class NonDurableCursorTest extends
MockedBookKeeperTestCase {
stopBookKeeper();
assertEquals(entries.size(), 1);
- try {
- cursor.markDelete(entries.get(0).getPosition());
- fail("call should have failed");
- } catch (ManagedLedgerException e) {
- // ok
- }
+ // Mark-delete should succeed if BK is down
+ cursor.markDelete(entries.get(0).getPosition());
entries.forEach(e -> e.release());
}
--
To stop receiving notification emails like this one, please contact
[email protected].