This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9213e58 Attempt to fix flakyness of
BrokerBkEnsemblesTests.testSkipCorruptDataLedger (#2318)
9213e58 is described below
commit 9213e58a74dc26096cc132eec709ae3d95ade095
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Aug 7 09:11:28 2018 +0900
Attempt to fix flakyness of
BrokerBkEnsemblesTests.testSkipCorruptDataLedger (#2318)
---
.../broker/service/BrokerBkEnsemblesTests.java | 22 +++++++++++++++-------
1 file changed, 15 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index ce023c3..94b2226 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -19,6 +19,9 @@
package org.apache.pulsar.broker.service;
import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
+import static org.testng.Assert.assertEquals;
+
+import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.net.URL;
@@ -54,8 +57,6 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import com.google.common.collect.Sets;
-
/**
*/
public class BrokerBkEnsemblesTests {
@@ -236,8 +237,11 @@ public class BrokerBkEnsemblesTests {
*
* @throws Exception
*/
- @Test(timeOut = 6000)
+ @Test
public void testSkipCorruptDataLedger() throws Exception {
+ // Ensure intended state for autoSkipNonRecoverableData
+
admin.brokers().updateDynamicConfiguration("autoSkipNonRecoverableData",
"false");
+
PulsarClient client =
PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0,
TimeUnit.SECONDS)
.build();
@@ -246,9 +250,13 @@ public class BrokerBkEnsemblesTests {
final int totalDataLedgers = 5;
final int entriesPerLedger = totalMessages / totalDataLedgers;
- admin.namespaces().createNamespace(ns1);
+ try {
+ admin.namespaces().createNamespace(ns1);
+ } catch (Exception e) {
- final String topic1 = "persistent://" + ns1 + "/my-topic";
+ }
+
+ final String topic1 = "persistent://" + ns1 + "/my-topic-" +
System.currentTimeMillis();
// Create subscription
Consumer<byte[]> consumer =
client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name")
@@ -287,6 +295,7 @@ public class BrokerBkEnsemblesTests {
// (2) delete first 4 data-ledgers
ledgerInfo.entrySet().forEach(entry -> {
if (!entry.equals(lastLedger)) {
+ assertEquals(entry.getValue().getEntries(), entriesPerLedger);
try {
bookKeeper.deleteLedger(entry.getKey());
} catch (Exception e) {
@@ -322,7 +331,7 @@ public class BrokerBkEnsemblesTests {
// (5) consumer will be able to consume 20 messages from last
non-deleted ledger
consumer =
client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name").subscribe();
for (int i = 0; i < entriesPerLedger; i++) {
- msg = consumer.receive(5, TimeUnit.SECONDS);
+ msg = consumer.receive();
System.out.println(i);
consumer.acknowledge(msg);
}
@@ -330,7 +339,6 @@ public class BrokerBkEnsemblesTests {
producer.close();
consumer.close();
client.close();
-
}
private static final Logger LOG =
LoggerFactory.getLogger(BrokerBkEnsemblesTests.class);