This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 1c6abaee817 [cherry-pick][branch-2.9] cherry-pick #17736 problem and
DnsResolverTest license header (#18640)
1c6abaee817 is described below
commit 1c6abaee817fda700286d281225e054be86b0789
Author: congbo <[email protected]>
AuthorDate: Sun Nov 27 23:58:47 2022 +0800
[cherry-pick][branch-2.9] cherry-pick #17736 problem and DnsResolverTest
license header (#18640)
1. cherry-pick: https://github.com/apache/pulsar/pull/17736 problem fix
2. fix cherry-pick 17526 bugs
https://github.com/poorbarcode/pulsar/blob/4db04f509451dc1f17b96009b2f5e268c1ea644b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java#L2015
it should change to !createTopicFuture.isPresent() not
createTopicFuture.isPresent()
3. and fix import
4. fix cherry-pick https://github.com/apache/pulsar/pull18283
testDeleteTopicAndSchemaForV1, throw exception messages error
5. run branch-2.9 test
### Motivation
fix branch-2.9 test
### Documentation
- [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
### Matching PR in forked repository
---
.../broker/admin/impl/PersistentTopicsBase.java | 2 +-
.../pulsar/broker/service/BrokerService.java | 2 +-
.../broker/service/BrokerBkEnsemblesTests.java | 142 ---------------------
.../pulsar/client/impl/NegativeAcksTest.java | 2 +-
.../pulsar/common/util/netty/DnsResolverTest.java | 2 +-
5 files changed, 4 insertions(+), 146 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 7725f1cd98c..4444c69be0f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1019,7 +1019,7 @@ public class PersistentTopicsBase extends AdminResource {
Throwable t = e.getCause();
log.error("[{}] Failed to delete topic {}", clientAppId(),
topicName, t);
if (t instanceof TopicBusyException) {
- throw new RestException(Status.PRECONDITION_FAILED, "Topic has
active producers/subscriptions");
+ throw new RestException(Status.PRECONDITION_FAILED,
t.getMessage());
} else if (isManagedLedgerNotFoundException(e)) {
throw new RestException(Status.NOT_FOUND, "Topic not found");
} else {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 3418d9f08e3..e23cb663dec 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1850,7 +1850,7 @@ public class BrokerService implements Closeable {
public CompletableFuture<Void> removeTopicFromCache(Topic topic) {
Optional<CompletableFuture<Optional<Topic>>> createTopicFuture =
findTopicFutureInCache(topic);
- if (createTopicFuture.isPresent()){
+ if (!createTopicFuture.isPresent()){
return CompletableFuture.completedFuture(null);
}
return removeTopicFutureFromCache(topic.getName(),
createTopicFuture.get());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index d5480e84a0a..aa63b224a9d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -276,148 +276,6 @@ public class BrokerBkEnsemblesTests extends
BkEnsemblesTestBase {
consumer.close();
}
-<<<<<<< HEAD
-=======
- @Test
- public void testTruncateCorruptDataLedger() throws Exception {
- // Ensure intended state for autoSkipNonRecoverableData
-
admin.brokers().updateDynamicConfiguration("autoSkipNonRecoverableData",
"false");
-
- @Cleanup
- PulsarClient client = PulsarClient.builder()
- .serviceUrl(pulsar.getWebServiceAddress())
- .statsInterval(0, TimeUnit.SECONDS)
- .build();
-
- final int totalMessages = 100;
- final int totalDataLedgers = 5;
- final int entriesPerLedger = totalMessages / totalDataLedgers;
-
- final String tenant = "prop";
- try {
- admin.tenants().createTenant(tenant, new
TenantInfoImpl(Sets.newHashSet("role1", "role2"),
- Sets.newHashSet(config.getClusterName())));
- } catch (Exception e) {
-
- }
- final String ns1 = tenant + "/crash-broker";
- try {
- admin.namespaces().createNamespace(ns1,
Sets.newHashSet(config.getClusterName()));
- } catch (Exception e) {
-
- }
-
- final String topic1 = "persistent://" + ns1 + "/my-topic-" +
System.currentTimeMillis();
-
- // Create subscription
- Consumer<byte[]> consumer =
client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name")
- .receiverQueueSize(5).subscribe();
-
- PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(topic1).get();
- ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger();
- ManagedCursorImpl cursor = (ManagedCursorImpl)
ml.getCursors().iterator().next();
- Field configField = ManagedCursorImpl.class.getDeclaredField("config");
- configField.setAccessible(true);
- // Create multiple data-ledger
- ManagedLedgerConfig config = (ManagedLedgerConfig)
configField.get(cursor);
- config.setMaxEntriesPerLedger(entriesPerLedger);
- config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
- // bookkeeper client
- Field bookKeeperField =
ManagedLedgerImpl.class.getDeclaredField("bookKeeper");
- bookKeeperField.setAccessible(true);
- // Create multiple data-ledger
- BookKeeper bookKeeper = (BookKeeper) bookKeeperField.get(ml);
-
- // (1) publish messages in 10 data-ledgers each with 20 entries under
managed-ledger
- Producer<byte[]> producer =
client.newProducer().topic(topic1).create();
- for (int i = 0; i < totalMessages; i++) {
- String message = "my-message-" + i;
- producer.send(message.getBytes());
- }
-
- // validate: consumer is able to consume msg and close consumer after
reading 1 entry
- Assert.assertNotNull(consumer.receive(1, TimeUnit.SECONDS));
- consumer.close();
-
- NavigableMap<Long, LedgerInfo> ledgerInfo = ml.getLedgersInfo();
- Assert.assertEquals(ledgerInfo.size(), totalDataLedgers);
- Entry<Long, LedgerInfo> lastLedger = ledgerInfo.lastEntry();
- long firstLedgerToDelete = lastLedger.getKey();
-
- // (2) delete first 4 data-ledgers
- ledgerInfo.entrySet().forEach(entry -> {
- if (!entry.equals(lastLedger)) {
- assertEquals(entry.getValue().getEntries(), entriesPerLedger);
- try {
- bookKeeper.deleteLedger(entry.getKey());
- } catch (Exception e) {
- log.warn("failed to delete ledger {}", entry.getKey(), e);
- }
- }
- });
-
- // create 5 more ledgers
- for (int i = 0; i < totalMessages; i++) {
- String message = "my-message2-" + i;
- producer.send(message.getBytes());
- }
-
- // Admin should be able to truncate the topic
- admin.topics().truncate(topic1);
-
- ledgerInfo.entrySet().forEach(entry -> {
- log.warn("found ledger: {}", entry.getKey());
- assertNotEquals(firstLedgerToDelete, entry.getKey());
- });
-
- // Currently, ledger deletion is async and failed deletion
- // does not actually fail truncation but logs an exception
- // and creates scheduled task to retry
- Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
- LedgerMetadata meta = bookKeeper
- .getLedgerMetadata(firstLedgerToDelete)
- .exceptionally(e -> null)
- .get();
- assertEquals(null, meta, "ledger should be deleted " +
firstLedgerToDelete);
- });
-
- // Should not throw, deleting absent ledger must be a noop
- // unless PulsarManager returned a wrong error which
- // got translated to BKUnexpectedConditionException
- try {
- bookKeeper.deleteLedger(firstLedgerToDelete);
- } catch (BKException.BKNoSuchLedgerExistsOnMetadataServerException
bke) {
- // pass
- }
-
- producer.close();
- consumer.close();
- }
-
- @Test
- public void testDeleteLedgerFactoryCorruptLedger() throws Exception {
- ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl)
pulsar.getManagedLedgerFactory();
- ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("test");
-
- // bookkeeper client
- Field bookKeeperField =
ManagedLedgerImpl.class.getDeclaredField("bookKeeper");
- bookKeeperField.setAccessible(true);
- // Create multiple data-ledger
- BookKeeper bookKeeper = (BookKeeper) bookKeeperField.get(ml);
-
- ml.addEntry("dummy-entry-1".getBytes());
-
- NavigableMap<Long, LedgerInfo> ledgerInfo = ml.getLedgersInfo();
- long lastLedger = ledgerInfo.lastEntry().getKey();
-
- ml.close();
- bookKeeper.deleteLedger(lastLedger);
-
- // BK ledger is deleted, factory should not throw on delete
- factory.delete("test");
- }
-
->>>>>>> 63d4cf20e7... ManagedLedger: move to FENCED state in case of
BadVersionException (#17736)
@Test(timeOut = 20000)
public void testTopicWithWildCardChar() throws Exception {
@Cleanup
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index 94727632a92..6298ec15619 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -38,7 +38,7 @@ import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.awaitility.Awaitility;
-import org.testcontainers.shaded.org.awaitility.reflect.WhiteboxImpl;
+import org.awaitility.reflect.WhiteboxImpl;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverTest.java
index 0ccb960e798..80ab1ea3484 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverTest.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information