ankit-j commented on a change in pull request #2096: Entries must be 
acknowledged by bookies in multiple fault domains before being acknowledged to 
client
URL: https://github.com/apache/bookkeeper/pull/2096#discussion_r287157906
 
 

 ##########
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
 ##########
 @@ -956,4 +964,152 @@ public void testLedgerDeletionIdempotency() throws 
Exception {
         bk.deleteLedger(ledgerId);
         bk.close();
     }
+
+    /**
+     * Mock of RackawareEnsemblePlacementPolicy. Overrides 
areAckedBookiesAdheringToPlacementPolicy to only return true
+     * when ackedBookies consists of writeQuorumSizeToUseForTesting bookies.
+     */
+    public static class MockRackawareEnsemblePlacementPolicy extends 
RackawareEnsemblePlacementPolicy {
+        private int writeQuorumSizeToUseForTesting;
+        private CountDownLatch conditionFirstInvocationLatch;
+
+        void setWriteQuorumSizeToUseForTesting(int 
writeQuorumSizeToUseForTesting) {
+            this.writeQuorumSizeToUseForTesting = 
writeQuorumSizeToUseForTesting;
+        }
+
+        void setConditionFirstInvocationLatch(CountDownLatch 
conditionFirstInvocationLatch) {
+            this.conditionFirstInvocationLatch = conditionFirstInvocationLatch;
+        }
+
+        @Override
+        public boolean 
areAckedBookiesAdheringToPlacementPolicy(Set<BookieSocketAddress> ackedBookies,
+                                                                int 
writeQuorumSize,
+                                                                int 
ackQuorumSize) {
+            conditionFirstInvocationLatch.countDown();
+            return ackedBookies.size() == writeQuorumSizeToUseForTesting;
+        }
+    }
+
+    /**
+     * Test to verify that PendingAddOp waits for success condition from 
areAckedBookiesAdheringToPlacementPolicy
+     * before returning success to client. Also tests working of 
WRITE_DELAYED_DUE_TO_NOT_ENOUGH_FAULT_DOMAINS and
+     * WRITE_TIMED_OUT_DUE_TO_NOT_ENOUGH_FAULT_DOMAINS counters.
+     */
+    @Test
+    public void testEnforceMinNumFaultDomainsForWrite() throws Exception {
+        byte[] data = "foobar".getBytes();
+        byte[] password = "testPasswd".getBytes();
+
+        startNewBookie();
+        startNewBookie();
+
+        ClientConfiguration conf = new ClientConfiguration();
+        conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+        
conf.setEnsemblePlacementPolicy(MockRackawareEnsemblePlacementPolicy.class);
+
+        conf.setAddEntryTimeout(2);
+        conf.setAddEntryQuorumTimeout(4);
+        conf.setEnforceMinNumFaultDomainsForWrite(true);
+
+        TestStatsProvider statsProvider = new TestStatsProvider();
+
+        // Abnormal values for testing to prevent timeouts
+        BookKeeperTestClient bk = new BookKeeperTestClient(conf, 
statsProvider);
+        StatsLogger statsLogger = bk.getStatsLogger();
+
+        int ensembleSize = 3;
+        int writeQuorumSize = 3;
+        int ackQuorumSize = 2;
+
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+        MockRackawareEnsemblePlacementPolicy currPlacementPolicy =
+                (MockRackawareEnsemblePlacementPolicy) bk.getPlacementPolicy();
+        currPlacementPolicy.setConditionFirstInvocationLatch(countDownLatch);
+        currPlacementPolicy.setWriteQuorumSizeToUseForTesting(writeQuorumSize);
+
+        BookieSocketAddress bookieToSleep;
+
+        try (LedgerHandle lh = bk.createLedger(ensembleSize, writeQuorumSize, 
ackQuorumSize, digestType, password)) {
+            CountDownLatch sleepLatchCase1 = new CountDownLatch(1);
+            CountDownLatch sleepLatchCase2 = new CountDownLatch(1);
+
+            // Put all non ensemble bookies to sleep
+            LOG.info("Putting all non ensemble bookies to sleep.");
+            for (BookieServer bookieServer : bs) {
+                try {
+                    if 
(!lh.getCurrentEnsemble().contains(bookieServer.getLocalAddress())) {
+                        sleepBookie(bookieServer.getLocalAddress(), 
sleepLatchCase2);
+                    }
+                } catch (UnknownHostException ignored) {}
+            }
+
+            Thread writeToLedger = new Thread(() -> {
+                try {
+                    LOG.info("Initiating write for entry");
+                    long entryId = lh.addEntry(data);
+                    LOG.info("Wrote entry with entryId = {}", entryId);
+                } catch (InterruptedException | BKException ignored) {
+                }
+            });
+
+            bookieToSleep = lh.getCurrentEnsemble().get(0);
+
+            LOG.info("Putting picked bookie to sleep");
+            sleepBookie(bookieToSleep, sleepLatchCase1);
+
+            assertEquals(statsLogger
+                           
.getCounter(WRITE_DELAYED_DUE_TO_NOT_ENOUGH_FAULT_DOMAINS)
+                           .get()
+                           .longValue(), 0);
+
+            // Trying to write entry
+            writeToLedger.start();
+
+            // Waiting and checking to make sure that write has not succeeded
+            countDownLatch.await(2, TimeUnit.SECONDS);
+            assertEquals("Write succeeded but should not have", -1, 
lh.lastAddConfirmed);
+
+            // Wake the bookie
+            sleepLatchCase1.countDown();
+
+            // Waiting and checking to make sure that write has succeeded
+            writeToLedger.join(1000);
+            assertEquals("Write did not succeed but should have", 0, 
lh.lastAddConfirmed);
+
+            assertEquals(statsLogger
+                           
.getCounter(WRITE_DELAYED_DUE_TO_NOT_ENOUGH_FAULT_DOMAINS)
+                           .get()
+                           .longValue(), 1);
+
+            // AddEntry thread for second scenario
+            Thread writeToLedger2 = new Thread(() -> {
+                try {
+                    LOG.info("Initiating write for entry");
+                    long entryId = lh.addEntry(data);
+                    LOG.info("Wrote entry with entryId = {}", entryId);
+                } catch (InterruptedException | BKException ignored) {
+                }
+            });
+
+            bookieToSleep = lh.getCurrentEnsemble().get(1);
+
+            LOG.info("Putting picked bookie to sleep");
+            sleepBookie(bookieToSleep, sleepLatchCase2);
+
+            // Trying to write entry
+            writeToLedger2.start();
+
+            // Waiting and checking to make sure that write has failed
+            writeToLedger2.join(6000);
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to