This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new ad831d159ed [improve][broker] Fix ServiceUnitStateCompactionStrategy
to cover fast-forward cursor behavior after compaction (#20110)
ad831d159ed is described below
commit ad831d159ed649dda6cdc554cd77621f5bc63ee4
Author: Heesung Sohn <[email protected]>
AuthorDate: Mon Apr 17 22:57:48 2023 -0700
[improve][broker] Fix ServiceUnitStateCompactionStrategy to cover
fast-forward cursor behavior after compaction (#20110)
Master Issue: https://github.com/apache/pulsar/issues/16691
### Motivation
Raising a PR to implement: https://github.com/apache/pulsar/issues/16691
After the compaction, the cursor can fast-forward to the compacted horizon
when a large number of messages are compacted before the next read. Hence,
ServiceUnitStateCompactionStrategy also needs to cover this case. Currently,
the existing and slow(their states are far behind) tableviews with
ServiceUnitStateCompactionStrategy could not accept those compacted messages.
In the load balance extension context, this means the ownership data could be
inconsistent among brokers.
### Modifications
This PR
- fixes ServiceUnitStateCompactionStrategy to accept the state data if
its version is bigger than the current version +1.
- (minor fix) does not repeatedly update the replication_clusters in the
policies when creating the system namespace. This update redundantly triggers
ZK watchers when restarting brokers.
- sets closeWithoutWaitingClientDisconnect=true, upon unload(following
the same setting as the modular LM's)
(cherry picked from commit 6cfa4683a44e7cce39fa6cb70e0fe1fb3d5eae56)
---
.../apache/pulsar/PulsarClusterMetadataSetup.java | 17 +++--
.../channel/ServiceUnitStateChannelImpl.java | 2 +-
.../ServiceUnitStateCompactionStrategy.java | 10 ++-
.../compaction/StrategicTwoPhaseCompactor.java | 2 +-
.../extensions/ExtensibleLoadManagerImplTest.java | 2 +
.../ServiceUnitStateCompactionStrategyTest.java | 4 ++
.../compaction/ServiceUnitStateCompactionTest.java | 79 ++++++++++++++++++++++
.../apache/pulsar/client/impl/TableViewImpl.java | 7 ++
8 files changed, 114 insertions(+), 9 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
index 0badbda1afd..9b757c55ccd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
@@ -385,10 +385,19 @@ public class PulsarClusterMetadataSetup {
namespaceResources.createPolicies(namespaceName, policies);
} else {
log.info("Namespace {} already exists.", namespaceName);
- namespaceResources.setPolicies(namespaceName, policies -> {
- policies.replication_clusters.add(cluster);
- return policies;
- });
+ var replicaClusterFound = false;
+ var policiesOptional =
namespaceResources.getPolicies(namespaceName);
+ if (policiesOptional.isPresent() &&
policiesOptional.get().replication_clusters.contains(cluster)) {
+ replicaClusterFound = true;
+ }
+ if (!replicaClusterFound) {
+ namespaceResources.setPolicies(namespaceName, policies -> {
+ policies.replication_clusters.add(cluster);
+ return policies;
+ });
+ log.info("Updated namespace:{} policies. Added the replication
cluster:{}",
+ namespaceName, cluster);
+ }
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 68c6440e68e..b4c4e7fd5d4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -819,7 +819,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
NamespaceBundle bundle = getNamespaceBundle(serviceUnit);
return pulsar.getBrokerService().unloadServiceUnit(
bundle,
- false,
+ true,
pulsar.getConfig().getNamespaceBundleUnloadingTimeoutMs(),
TimeUnit.MILLISECONDS)
.thenApply(numUnloadedTopics -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
index ceb3ea3e9cb..72b05b5cd62 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
@@ -52,9 +52,13 @@ public class ServiceUnitStateCompactionStrategy implements
TopicCompactionStrate
return false;
}
- // Skip the compaction case where from = null and to.versionId > 1
- if (from != null && from.versionId() + 1 != to.versionId()) {
- return true;
+ if (from != null) {
+ if (from.versionId() == Long.MAX_VALUE && to.versionId() ==
Long.MIN_VALUE) { // overflow
+ } else if (from.versionId() >= to.versionId()) {
+ return true;
+ } else if (from.versionId() < to.versionId() - 1) { // Compacted
+ return false;
+ } // else from.versionId() == to.versionId() - 1 // continue to
check further
}
if (to.force()) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
index 557d4a65801..a6b09427427 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
@@ -379,7 +379,7 @@ public class StrategicTwoPhaseCompactor extends
TwoPhaseCompactor {
});
})
.thenCompose(v -> {
- log.info("Acking ledger id {}", phaseOneResult.firstId);
+ log.info("Acking ledger id {}", phaseOneResult.lastId);
return ((CompactionReaderImpl<T>) reader)
.acknowledgeCumulativeAsync(
phaseOneResult.lastId,
Map.of(COMPACTED_TOPIC_LEDGER_PROPERTY,
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 9ab2467d0ef..b71eeb4745b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -592,6 +592,8 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
restartBroker();
pulsar1 = pulsar;
setPrimaryLoadManager();
+ admin.namespaces().setNamespaceReplicationClusters("public/default",
+ Sets.newHashSet(this.conf.getClusterName()));
var serviceUnitStateChannelPrimaryNew =
(ServiceUnitStateChannelImpl)
FieldUtils.readDeclaredField(primaryLoadManager,
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategyTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategyTest.java
index 64964826af6..62de91dab29 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategyTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategyTest.java
@@ -85,6 +85,10 @@ public class ServiceUnitStateCompactionStrategyTest {
new ServiceUnitStateData(Owned, dst, src, 10),
new ServiceUnitStateData(Releasing, "broker2", dst, 5)));
+ assertFalse(strategy.shouldKeepLeft(
+ new ServiceUnitStateData(Owned, dst, src, 10),
+ new ServiceUnitStateData(Owned, "broker2", dst, 12)));
+
}
@Test
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
index 1a69a86f7c6..e4f0750a981 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
@@ -27,6 +27,9 @@ import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUni
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.isValidTransition;
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MSG_COMPRESSION_TYPE;
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
@@ -49,6 +52,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.commons.lang.reflect.FieldUtils;
@@ -69,6 +73,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
@@ -628,6 +633,80 @@ public class ServiceUnitStateCompactionTest extends
MockedPulsarServiceBaseTest
}
+ @Test
+ public void testSlowReceiveTableviewAfterCompaction() throws Exception {
+ String topic = "persistent://my-property/use/my-ns/my-topic1";
+ String strategyClassName = "topicCompactionStrategyClassName";
+
+ pulsarClient.newConsumer(schema)
+ .topic(topic)
+ .subscriptionName("sub1")
+ .readCompacted(true)
+ .subscribe().close();
+
+ var tv = pulsar.getClient().newTableViewBuilder(schema)
+ .topic(topic)
+ .subscriptionName("slowTV")
+ .loadConf(Map.of(
+ strategyClassName,
+ ServiceUnitStateCompactionStrategy.class.getName()))
+ .create();
+
+ // Configure retention to ensue data is retained for reader
+ admin.namespaces().setRetention("my-property/use/my-ns",
+ new RetentionPolicies(-1, -1));
+
+ Producer<ServiceUnitStateData> producer =
pulsarClient.newProducer(schema)
+ .topic(topic)
+ .compressionType(MSG_COMPRESSION_TYPE)
+ .enableBatching(true)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+
+ StrategicTwoPhaseCompactor compactor
+ = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
+
+ var reader = ((CompletableFuture<ReaderImpl<ServiceUnitStateData>>)
FieldUtils
+ .readDeclaredField(tv, "reader", true)).get();
+ var consumer = spy(reader.getConsumer());
+ FieldUtils.writeDeclaredField(reader, "consumer", consumer, true);
+ String bundle = "bundle1";
+ final AtomicInteger versionId = new AtomicInteger(0);
+ final AtomicInteger cnt = new AtomicInteger(1);
+ int msgAddCount = 1000; // has to be big enough to cover compacted
cursor fast-forward.
+ doAnswer(invocationOnMock -> {
+ if (cnt.decrementAndGet() == 0) {
+ var msg = consumer.receiveAsync();
+ for (int i = 0; i < msgAddCount; i++) {
+ producer.newMessage().key(bundle).value(
+ new ServiceUnitStateData(Owned, "broker" +
versionId.incrementAndGet(), true,
+ versionId.get())).send();
+ }
+ compactor.compact(topic, strategy).join();
+ return msg;
+ }
+ // Call the real method
+ reset(consumer);
+ return consumer.receiveAsync();
+ }).when(consumer).receiveAsync();
+ producer.newMessage().key(bundle).value(
+ new ServiceUnitStateData(Owned, "broker", true,
+ versionId.incrementAndGet())).send();
+ producer.newMessage().key(bundle).value(
+ new ServiceUnitStateData(Owned, "broker" +
versionId.incrementAndGet(), true,
+ versionId.get())).send();
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(
+ () -> {
+ var val = tv.get(bundle);
+ assertNotNull(val);
+ assertEquals(val.dstBroker(), "broker" + versionId.get());
+ }
+ );
+
+ producer.close();
+ tv.close();
+ }
+
@Test
public void testBrokerRestartAfterCompaction() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
index ff5b251ad55..77aba7e48cb 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
@@ -192,6 +192,13 @@ public class TableViewImpl<T> implements TableView<T> {
if (compactionStrategy != null) {
T prev = data.get(key);
update = !compactionStrategy.shouldKeepLeft(prev, cur);
+ if (!update) {
+ log.info("Skipped the message from topic {}. key={}
value={} prev={}",
+ conf.getTopicName(),
+ key,
+ cur,
+ prev);
+ }
}
if (update) {