This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1cd1aef3c74 [improve][broker] PIP-192 Added
ServiceUnitStateCompactionStrategy (#19045)
1cd1aef3c74 is described below
commit 1cd1aef3c74fac0a2ded99da05b658578d8481e7
Author: Heesung Sohn <[email protected]>
AuthorDate: Tue Jan 31 00:37:11 2023 -0800
[improve][broker] PIP-192 Added ServiceUnitStateCompactionStrategy (#19045)
---
.../org/apache/pulsar/broker/PulsarService.java | 15 +
.../extensions/channel/ServiceUnitState.java | 4 +-
.../channel/ServiceUnitStateChannelImpl.java | 81 +-
.../ServiceUnitStateCompactionStrategy.java | 89 +++
.../broker/service/persistent/PersistentTopic.java | 21 +-
.../compaction/StrategicTwoPhaseCompactor.java | 1 +
.../channel/ServiceUnitStateChannelTest.java | 84 ++-
.../ServiceUnitStateCompactionStrategyTest.java | 90 +++
.../extensions/channel/ServiceUnitStateTest.java | 4 +-
.../compaction/ServiceUnitStateCompactionTest.java | 831 +++++++++++++++++++++
.../apache/pulsar/client/impl/TableViewImpl.java | 2 +-
11 files changed, 1168 insertions(+), 54 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 1532b28343c..06b41e46636 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -146,6 +146,7 @@ import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.common.util.ThreadDumpUtil;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.StrategicTwoPhaseCompactor;
import org.apache.pulsar.compaction.TwoPhaseCompactor;
import org.apache.pulsar.functions.worker.ErrorNotifier;
import org.apache.pulsar.functions.worker.WorkerConfig;
@@ -198,6 +199,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
private TopicPoliciesService topicPoliciesService =
TopicPoliciesService.DISABLED;
private BookKeeperClientFactory bkClientFactory;
private Compactor compactor;
+ private StrategicTwoPhaseCompactor strategicCompactor;
private ResourceUsageTransportManager resourceUsageTransportManager;
private ResourceGroupService resourceGroupServiceManager;
@@ -1473,6 +1475,19 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
return this.compactor;
}
+ public StrategicTwoPhaseCompactor newStrategicCompactor() throws
PulsarServerException {
+ return new StrategicTwoPhaseCompactor(this.getConfiguration(),
+ getClient(), getBookKeeperClient(),
+ getCompactorExecutor());
+ }
+
+ public synchronized StrategicTwoPhaseCompactor getStrategicCompactor()
throws PulsarServerException {
+ if (this.strategicCompactor == null) {
+ this.strategicCompactor = newStrategicCompactor();
+ }
+ return this.strategicCompactor;
+ }
+
protected synchronized OrderedScheduler
getOffloaderScheduler(OffloadPoliciesImpl offloadPolicies) {
if (this.offloaderScheduler == null) {
this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder()
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitState.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitState.java
index cd1092a26ea..3225c0ba7bb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitState.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitState.java
@@ -66,7 +66,9 @@ public enum ServiceUnitState {
Splitting; // the service unit(e.g. bundle) is in the process of splitting.
private static Map<ServiceUnitState, Set<ServiceUnitState>>
validTransitions = Map.of(
- Free, Set.of(Owned, Assigned),
+ // (Free -> Released | Splitting) transitions are required
+ // when the topic is compacted in the middle of transfer or split.
+ Free, Set.of(Owned, Assigned, Released, Splitting),
Owned, Set.of(Assigned, Splitting, Free),
Assigned, Set.of(Owned, Released, Free),
Released, Set.of(Owned, Free),
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index a476be974a3..38e8afa50f3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -43,7 +43,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableInt;
@@ -101,7 +101,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
private long totalCleanupCnt = 0;
private long totalBrokerCleanupTombstoneCnt = 0;
private long totalServiceUnitCleanupTombstoneCnt = 0;
- private long totalServiceUnitCleanupErrorCnt = 0;
+ private AtomicLong totalCleanupErrorCnt = new AtomicLong();
private long totalCleanupScheduledCnt = 0;
private long totalCleanupIgnoredCnt = 0;
private long totalCleanupCancelledCnt = 0;
@@ -175,10 +175,11 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
tableview = pulsar.getClient().newTableViewBuilder(schema)
.topic(TOPIC)
- // TODO: enable CompactionStrategy
+ .loadConf(Map.of(
+ "topicCompactionStrategyClassName",
+
ServiceUnitStateCompactionStrategy.class.getName()))
.create();
- // TODO: schedule listen instead of foreachAndListen
- tableview.forEachAndListen((key, value) -> handle(key, value));
+ tableview.listen((key, value) -> handle(key, value));
log.debug("Successfully started the channel tableview.");
pulsar.getLocalMetadataStore().registerSessionListener(this::handleMetadataSessionEvent);
@@ -332,8 +333,6 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
ServiceUnitState state = data == null ? Free : data.state();
-
- // TODO : Add state validation in tableview by the compaction strategy
switch (state) {
case Owned -> handleOwnEvent(serviceUnit, data);
case Assigned -> handleAssignEvent(serviceUnit, data);
@@ -599,7 +598,16 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
.delayedExecutor(delayInSecs, TimeUnit.SECONDS,
pulsar.getLoadManagerExecutor());
totalCleanupScheduledCnt++;
return CompletableFuture
- .runAsync(() -> doCleanup(broker), delayed);
+ .runAsync(() -> {
+ try {
+ doCleanup(broker);
+ } catch (Throwable e) {
+ log.error("Failed to run the cleanup job
for the broker {}, "
+ +
"totalCleanupErrorCnt:{}.",
+ broker,
totalCleanupErrorCnt.incrementAndGet(), e);
+ }
+ }
+ , delayed);
});
log.info("Scheduled ownership cleanup for broker:{} with delay:{}
secs. Pending clean jobs:{}.",
@@ -610,8 +618,8 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
private void doCleanup(String broker) {
long startTime = System.nanoTime();
log.info("Started ownership cleanup for the inactive broker:{}",
broker);
- AtomicInteger serviceUnitTombstoneCnt = new AtomicInteger();
- AtomicInteger serviceUnitTombstoneErrorCnt = new AtomicInteger();
+ int serviceUnitTombstoneCnt = 0;
+ long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
for (Map.Entry<String, ServiceUnitStateData> etr :
tableview.entrySet()) {
ServiceUnitStateData stateData = etr.getValue();
String serviceUnit = etr.getKey();
@@ -619,14 +627,14 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
|| StringUtils.equals(broker, stateData.sourceBroker())) {
log.info("Cleaning ownership serviceUnit:{}, stateData:{}.",
serviceUnit, stateData);
tombstoneAsync(serviceUnit).whenComplete((__, e) -> {
- if (e == null) {
- serviceUnitTombstoneCnt.incrementAndGet();
- } else {
- log.error("Failed cleaning the ownership
serviceUnit:{}, stateData:{}.",
- serviceUnit, stateData);
- serviceUnitTombstoneErrorCnt.incrementAndGet();
+ if (e != null) {
+ log.error("Failed cleaning the ownership
serviceUnit:{}, stateData:{}, "
+ + "cleanupErrorCnt:{}.",
+ serviceUnit, stateData,
+ totalCleanupErrorCnt.incrementAndGet() -
totalCleanupErrorCntStart);
}
});
+ serviceUnitTombstoneCnt++;
}
}
@@ -636,26 +644,22 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
log.error("Failed to flush the in-flight messages.", e);
}
- if (serviceUnitTombstoneCnt.get() > 0) {
+ if (serviceUnitTombstoneCnt > 0) {
this.totalCleanupCnt++;
- this.totalServiceUnitCleanupTombstoneCnt +=
serviceUnitTombstoneCnt.get();
+ this.totalServiceUnitCleanupTombstoneCnt +=
serviceUnitTombstoneCnt;
this.totalBrokerCleanupTombstoneCnt++;
}
- if (serviceUnitTombstoneErrorCnt.get() > 0) {
- this.totalServiceUnitCleanupErrorCnt +=
serviceUnitTombstoneErrorCnt.get();
- }
-
double cleanupTime = TimeUnit.NANOSECONDS
.toMillis((System.nanoTime() - startTime));
// TODO: clean load data stores
log.info("Completed a cleanup for the inactive broker:{} in {} ms. "
+ "Published tombstone for orphan service units:
serviceUnitTombstoneCnt:{}, "
- + "serviceUnitTombstoneErrorCnt:{}, metrics:{} ",
+ + "approximate cleanupErrorCnt:{}, metrics:{} ",
broker,
cleanupTime,
serviceUnitTombstoneCnt,
- serviceUnitTombstoneErrorCnt,
+ totalCleanupErrorCntStart - totalCleanupErrorCnt.get(),
printCleanupMetrics());
cleanupJobs.remove(broker);
}
@@ -675,8 +679,8 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
long startTime = System.nanoTime();
Set<String> inactiveBrokers = new HashSet<>();
Set<String> activeBrokers = new HashSet<>(brokers);
- AtomicInteger serviceUnitTombstoneCnt = new AtomicInteger();
- AtomicInteger serviceUnitTombstoneErrorCnt = new AtomicInteger();
+ int serviceUnitTombstoneCnt = 0;
+ long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
long now = System.currentTimeMillis();
for (Map.Entry<String, ServiceUnitStateData> etr :
tableview.entrySet()) {
String serviceUnit = etr.getKey();
@@ -690,14 +694,14 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
serviceUnit, stateData);
tombstoneAsync(serviceUnit).whenComplete((__, e) -> {
- if (e == null) {
- serviceUnitTombstoneCnt.incrementAndGet();
- } else {
- log.error("Failed cleaning the ownership
serviceUnit:{}, stateData:{}.",
- serviceUnit, stateData);
- serviceUnitTombstoneErrorCnt.incrementAndGet();
+ if (e != null) {
+ log.error("Failed cleaning the ownership
serviceUnit:{}, stateData:{}, "
+ + "cleanupErrorCnt:{}.",
+ serviceUnit, stateData,
+ totalCleanupErrorCnt.incrementAndGet() -
totalCleanupErrorCntStart);
}
});
+ serviceUnitTombstoneCnt++;
}
}
@@ -711,22 +715,21 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
log.error("Failed to flush the in-flight messages.", e);
}
- if (serviceUnitTombstoneCnt.get() > 0) {
- this.totalServiceUnitCleanupTombstoneCnt +=
serviceUnitTombstoneCnt.get();
+ if (serviceUnitTombstoneCnt > 0) {
+ this.totalServiceUnitCleanupTombstoneCnt +=
serviceUnitTombstoneCnt;
}
- this.totalServiceUnitCleanupErrorCnt +=
serviceUnitTombstoneErrorCnt.get();
double monitorTime = TimeUnit.NANOSECONDS
.toMillis((System.nanoTime() - startTime));
log.info("Completed the ownership monitor run in {} ms. "
+ "Scheduled cleanups for inactiveBrokers:{}.
inactiveBrokerCount:{}. "
+ "Published tombstone for orphan service units:
serviceUnitTombstoneCnt:{}, "
- + "serviceUnitTombstoneErrorCnt:{}, metrics:{} ",
+ + "approximate cleanupErrorCnt:{}, metrics:{} ",
monitorTime,
inactiveBrokers,
inactiveBrokers.size(),
serviceUnitTombstoneCnt,
- serviceUnitTombstoneErrorCnt,
+ totalCleanupErrorCntStart - totalCleanupErrorCnt.get(),
printCleanupMetrics());
}
@@ -734,13 +737,13 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
private String printCleanupMetrics() {
return String.format(
"{totalCleanupCnt:%d, totalBrokerCleanupTombstoneCnt:%d, "
- + "totalServiceUnitCleanupTombstoneCnt:%d,
totalServiceUnitCleanupErrorCnt:%d, "
+ + "totalServiceUnitCleanupTombstoneCnt:%d,
totalCleanupErrorCnt:%d, "
+ "totalCleanupScheduledCnt%d,
totalCleanupIgnoredCnt:%d, totalCleanupCancelledCnt:%d, "
+ " activeCleanupJobs:%d}",
totalCleanupCnt,
totalBrokerCleanupTombstoneCnt,
totalServiceUnitCleanupTombstoneCnt,
- totalServiceUnitCleanupErrorCnt,
+ totalCleanupErrorCnt.get(),
totalCleanupScheduledCnt,
totalCleanupIgnoredCnt,
totalCleanupCancelledCnt,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
new file mode 100644
index 00000000000..2b21f830dda
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.channel;
+
+import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigned;
+import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Free;
+import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned;
+import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Released;
+import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Splitting;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
+
+public class ServiceUnitStateCompactionStrategy implements
TopicCompactionStrategy<ServiceUnitStateData> {
+
+ private final Schema<ServiceUnitStateData> schema;
+
+ private boolean checkBrokers = true;
+
+ public ServiceUnitStateCompactionStrategy() {
+ schema = Schema.JSON(ServiceUnitStateData.class);
+ }
+
+ @Override
+ public Schema<ServiceUnitStateData> getSchema() {
+ return schema;
+ }
+
+ @VisibleForTesting
+ public void checkBrokers(boolean check) {
+ this.checkBrokers = check;
+ }
+
+ @Override
+ public boolean shouldKeepLeft(ServiceUnitStateData from,
ServiceUnitStateData to) {
+ ServiceUnitState prevState = from == null ? Free : from.state();
+ ServiceUnitState state = to == null ? Free : to.state();
+ if (!ServiceUnitState.isValidTransition(prevState, state)) {
+ return true;
+ }
+
+ if (checkBrokers) {
+ if (prevState == Free && (state == Assigned || state == Owned)) {
+ // Free -> Assigned || Owned broker check
+ return StringUtils.isBlank(to.broker());
+ } else if (prevState == Owned && state == Assigned) {
+ // Owned -> Assigned(transfer) broker check
+ return !StringUtils.equals(from.broker(), to.sourceBroker())
+ || StringUtils.isBlank(to.broker())
+ || StringUtils.equals(from.broker(), to.broker());
+ } else if (prevState == Assigned && state == Released) {
+ // Assigned -> Released(transfer) broker check
+ return !StringUtils.equals(from.broker(), to.broker())
+ || !StringUtils.equals(from.sourceBroker(),
to.sourceBroker());
+ } else if (prevState == Released && state == Owned) {
+ // Released -> Owned(transfer) broker check
+ return !StringUtils.equals(from.broker(), to.broker())
+ || !StringUtils.equals(from.sourceBroker(),
to.sourceBroker());
+ } else if (prevState == Assigned && state == Owned) {
+ // Assigned -> Owned broker check
+ return !StringUtils.equals(from.broker(), to.broker())
+ || !StringUtils.equals(from.sourceBroker(),
to.sourceBroker());
+ } else if (prevState == Owned && state == Splitting) {
+ // Owned -> Splitting broker check
+ return !StringUtils.equals(from.broker(), to.broker());
+ }
+ }
+
+ return false;
+ }
+
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index dda9c89b726..d009d3778f2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -78,6 +78,8 @@ import org.apache.bookkeeper.net.BookieId;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateCompactionStrategy;
import org.apache.pulsar.broker.namespace.NamespaceService;
import
org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources;
import org.apache.pulsar.broker.service.AbstractReplicator;
@@ -152,6 +154,7 @@ import
org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
@@ -203,6 +206,11 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
private CompletableFuture<Long> currentCompaction =
CompletableFuture.completedFuture(COMPACTION_NEVER_RUN);
private final CompactedTopic compactedTopic;
+ // TODO: Create compaction strategy from topic policy when exposing
strategic compaction to users.
+ private static Map<String, TopicCompactionStrategy> strategicCompactionMap
= Map.of(
+ ServiceUnitStateChannelImpl.TOPIC,
+ new ServiceUnitStateCompactionStrategy());
+
private CompletableFuture<MessageIdImpl> currentOffload =
CompletableFuture.completedFuture(
(MessageIdImpl) MessageId.earliest);
@@ -1571,6 +1579,11 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
if (backlogEstimate > compactionThreshold) {
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "topic:{} backlogEstimate:{} is bigger than
compactionThreshold:{}. Triggering "
+ + "compaction", topic,
backlogEstimate, compactionThreshold);
+ }
try {
triggerCompaction();
} catch (AlreadyRunningException are) {
@@ -3000,7 +3013,13 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
public synchronized void triggerCompaction()
throws PulsarServerException, AlreadyRunningException {
if (currentCompaction.isDone()) {
- currentCompaction =
brokerService.pulsar().getCompactor().compact(topic);
+
+ if (strategicCompactionMap.containsKey(topic)) {
+ currentCompaction =
brokerService.pulsar().getStrategicCompactor()
+ .compact(topic, strategicCompactionMap.get(topic));
+ } else {
+ currentCompaction =
brokerService.pulsar().getCompactor().compact(topic);
+ }
} else {
throw new AlreadyRunningException("Compaction already in
progress");
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
index bb0850efab4..9dc4ec649b6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
@@ -385,6 +385,7 @@ public class StrategicTwoPhaseCompactor extends
TwoPhaseCompactor {
promise.completeExceptionally(e);
return;
}
+ outstanding.release(MAX_OUTSTANDING);
promise.complete(null);
return;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index ad4d0cb2f0b..a16c2be6612 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions.channel;
+import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned;
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MAX_CLEAN_UP_DELAY_TIME_IN_SECS;
import static
org.apache.pulsar.metadata.api.extended.SessionEvent.ConnectionLost;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.Reconnected;
@@ -44,6 +45,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -51,6 +53,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.PulsarServerException;
@@ -60,6 +63,7 @@ import
org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.TableViewImpl;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -89,6 +93,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
@Override
protected void setup() throws Exception {
conf.setAllowAutoTopicCreation(true);
+ conf.setBrokerServiceCompactionMonitorIntervalInSeconds(10);
super.internalSetup(conf);
admin.tenants().createTenant("pulsar", createDefaultTenantInfo());
@@ -289,8 +294,6 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
var ownerAddr2 = channel2.getOwnerAsync(bundle).get();
assertEquals(ownerAddr1, ownerAddr2);
- // TODO: check conflict resolution
- // assertEquals(assignedAddr1, ownerAddr1);
assertEquals(getOwnerRequests1.size(), 0);
assertEquals(getOwnerRequests2.size(), 0);
}
@@ -567,7 +570,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt"));
assertEquals(1, getCleanupMetric(leaderChannel,
"totalBrokerCleanupTombstoneCnt"));
assertEquals(2, getCleanupMetric(leaderChannel,
"totalServiceUnitCleanupTombstoneCnt"));
- assertEquals(0, getCleanupMetric(leaderChannel,
"totalServiceUnitCleanupErrorCnt"));
+ assertEquals(0, getCleanupMetric(leaderChannel,
"totalCleanupErrorCnt"));
assertEquals(1, getCleanupMetric(leaderChannel,
"totalCleanupScheduledCnt"));
assertEquals(0, getCleanupMetric(leaderChannel,
"totalCleanupIgnoredCnt"));
assertEquals(0, getCleanupMetric(leaderChannel,
"totalCleanupCancelledCnt"));
@@ -592,7 +595,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt"));
assertEquals(1, getCleanupMetric(leaderChannel,
"totalBrokerCleanupTombstoneCnt"));
assertEquals(2, getCleanupMetric(leaderChannel,
"totalServiceUnitCleanupTombstoneCnt"));
- assertEquals(0, getCleanupMetric(leaderChannel,
"totalServiceUnitCleanupErrorCnt"));
+ assertEquals(0, getCleanupMetric(leaderChannel,
"totalCleanupErrorCnt"));
assertEquals(2, getCleanupMetric(leaderChannel,
"totalCleanupScheduledCnt"));
assertEquals(0, getCleanupMetric(leaderChannel,
"totalCleanupIgnoredCnt"));
assertEquals(0, getCleanupMetric(leaderChannel,
"totalCleanupCancelledCnt"));
@@ -608,7 +611,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt"));
assertEquals(1, getCleanupMetric(leaderChannel,
"totalBrokerCleanupTombstoneCnt"));
assertEquals(2, getCleanupMetric(leaderChannel,
"totalServiceUnitCleanupTombstoneCnt"));
- assertEquals(0, getCleanupMetric(leaderChannel,
"totalServiceUnitCleanupErrorCnt"));
+ assertEquals(0, getCleanupMetric(leaderChannel,
"totalCleanupErrorCnt"));
assertEquals(2, getCleanupMetric(leaderChannel,
"totalCleanupScheduledCnt"));
assertEquals(0, getCleanupMetric(leaderChannel,
"totalCleanupIgnoredCnt"));
assertEquals(1, getCleanupMetric(leaderChannel,
"totalCleanupCancelledCnt"));
@@ -626,7 +629,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt"));
assertEquals(1, getCleanupMetric(leaderChannel,
"totalBrokerCleanupTombstoneCnt"));
assertEquals(2, getCleanupMetric(leaderChannel,
"totalServiceUnitCleanupTombstoneCnt"));
- assertEquals(0, getCleanupMetric(leaderChannel,
"totalServiceUnitCleanupErrorCnt"));
+ assertEquals(0, getCleanupMetric(leaderChannel,
"totalCleanupErrorCnt"));
assertEquals(3, getCleanupMetric(leaderChannel,
"totalCleanupScheduledCnt"));
assertEquals(0, getCleanupMetric(leaderChannel,
"totalCleanupIgnoredCnt"));
assertEquals(1, getCleanupMetric(leaderChannel,
"totalCleanupCancelledCnt"));
@@ -644,7 +647,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
assertEquals(2, getCleanupMetric(leaderChannel, "totalCleanupCnt"));
assertEquals(2, getCleanupMetric(leaderChannel,
"totalBrokerCleanupTombstoneCnt"));
assertEquals(4, getCleanupMetric(leaderChannel,
"totalServiceUnitCleanupTombstoneCnt"));
- assertEquals(0, getCleanupMetric(leaderChannel,
"totalServiceUnitCleanupErrorCnt"));
+ assertEquals(0, getCleanupMetric(leaderChannel,
"totalCleanupErrorCnt"));
assertEquals(3, getCleanupMetric(leaderChannel,
"totalCleanupScheduledCnt"));
assertEquals(0, getCleanupMetric(leaderChannel,
"totalCleanupIgnoredCnt"));
assertEquals(1, getCleanupMetric(leaderChannel,
"totalCleanupCancelledCnt"));
@@ -669,7 +672,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
assertEquals(2, getCleanupMetric(leaderChannel, "totalCleanupCnt"));
assertEquals(2, getCleanupMetric(leaderChannel,
"totalBrokerCleanupTombstoneCnt"));
assertEquals(4, getCleanupMetric(leaderChannel,
"totalServiceUnitCleanupTombstoneCnt"));
- assertEquals(0, getCleanupMetric(leaderChannel,
"totalServiceUnitCleanupErrorCnt"));
+ assertEquals(0, getCleanupMetric(leaderChannel,
"totalCleanupErrorCnt"));
assertEquals(3, getCleanupMetric(leaderChannel,
"totalCleanupScheduledCnt"));
assertEquals(1, getCleanupMetric(leaderChannel,
"totalCleanupIgnoredCnt"));
assertEquals(1, getCleanupMetric(leaderChannel,
"totalCleanupCancelledCnt"));
@@ -682,6 +685,62 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
true);
}
+ @Test(priority = 10)
+ public void conflictAndCompactionTest() throws ExecutionException,
InterruptedException, TimeoutException,
+ IllegalAccessException, PulsarClientException,
PulsarServerException {
+
+ var producer = (Producer<ServiceUnitStateData>)
FieldUtils.readDeclaredField(channel1, "producer", true);
+ producer.newMessage().key(bundle).send();
+ var owner1 = channel1.getOwnerAsync(bundle);
+ var owner2 = channel2.getOwnerAsync(bundle);
+ assertNull(owner1.get());
+ assertNull(owner2.get());
+
+ var assigned1 = channel1.publishAssignEventAsync(bundle,
lookupServiceAddress1);
+ assertNotNull(assigned1);
+
+ waitUntilNewOwner(channel1, bundle, lookupServiceAddress1);
+ waitUntilNewOwner(channel2, bundle, lookupServiceAddress1);
+ String assignedAddr1 = assigned1.get(5, TimeUnit.SECONDS);
+ assertEquals(lookupServiceAddress1, assignedAddr1);
+
+ FieldUtils.writeDeclaredField(channel2,
+ "inFlightStateWaitingTimeInMillis", 3 * 1000, true);
+ var assigned2 = channel2.publishAssignEventAsync(bundle,
lookupServiceAddress2);
+ assertNotNull(assigned2);
+ Exception ex = null;
+ try {
+ assigned2.join();
+ } catch (CompletionException e) {
+ ex = e;
+ }
+ assertNotNull(ex);
+ assertEquals(TimeoutException.class, ex.getCause().getClass());
+ assertEquals(lookupServiceAddress1,
channel2.getOwnerAsync(bundle).get());
+ assertEquals(lookupServiceAddress1,
channel1.getOwnerAsync(bundle).get());
+
+ var compactor = spy (pulsar1.getStrategicCompactor());
+ FieldUtils.writeDeclaredField(pulsar1, "strategicCompactor",
compactor, true);
+ FieldUtils.writeDeclaredField(pulsar2, "strategicCompactor",
compactor, true);
+ Awaitility.await()
+ .pollInterval(200, TimeUnit.MILLISECONDS)
+ .atMost(140, TimeUnit.SECONDS)
+ .untilAsserted(() -> verify(compactor, times(1))
+ .compact(eq(ServiceUnitStateChannelImpl.TOPIC),
any()));
+
+ var channel3 = new ServiceUnitStateChannelImpl(pulsar1);
+ channel3.start();
+ Awaitility.await()
+ .pollInterval(200, TimeUnit.MILLISECONDS)
+ .atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertEquals(
+ channel3.getOwnerAsync(bundle).get(),
lookupServiceAddress1));
+ channel3.close();
+ FieldUtils.writeDeclaredField(channel2,
+ "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
+ }
+
+
// TODO: add the channel recovery test when broker registry is added.
private static ConcurrentOpenHashMap<String,
CompletableFuture<Optional<String>>> getOwnerRequests(
@@ -768,7 +827,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
if (actual == null) {
return true;
} else {
- return actual.state() != ServiceUnitState.Owned;
+ return actual.state() != Owned;
}
});
}
@@ -784,6 +843,11 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
private static long getCleanupMetric(ServiceUnitStateChannel channel,
String metric)
throws IllegalAccessException {
- return (long) FieldUtils.readDeclaredField(channel, metric, true);
+ Object var = FieldUtils.readDeclaredField(channel, metric, true);
+ if (var instanceof AtomicLong) {
+ return ((AtomicLong) var).get();
+ } else {
+ return (long) var;
+ }
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategyTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategyTest.java
new file mode 100644
index 00000000000..49b55f7660a
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategyTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.channel;
+
+import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigned;
+import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Free;
+import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned;
+import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Released;
+import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Splitting;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.testng.Assert.assertTrue;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class ServiceUnitStateCompactionStrategyTest {
+ ServiceUnitStateCompactionStrategy strategy = new
ServiceUnitStateCompactionStrategy();
+
+ ServiceUnitStateData data(ServiceUnitState state) {
+ return new ServiceUnitStateData(state, "broker");
+ }
+
+ ServiceUnitStateData data(ServiceUnitState state, String dst) {
+ return new ServiceUnitStateData(state, dst, "broker");
+ }
+ ServiceUnitStateData data(ServiceUnitState state, String src, String dst) {
+ return new ServiceUnitStateData(state, dst, src);
+ }
+
+ @Test
+ public void test() throws InterruptedException {
+ String dst = "dst";
+ assertTrue(strategy.shouldKeepLeft(data(Free), data(Free)));
+ assertFalse(strategy.shouldKeepLeft(data(Free), data(Assigned)));
+ assertTrue(strategy.shouldKeepLeft(data(Free), data(Assigned, "")));
+ assertFalse(strategy.shouldKeepLeft(data(Free), data(Owned)));
+ assertTrue(strategy.shouldKeepLeft(data(Free), data(Owned, "")));
+ assertFalse(strategy.shouldKeepLeft(data(Free), data(Released)));
+ assertFalse(strategy.shouldKeepLeft(data(Free), data(Splitting)));
+
+ assertFalse(strategy.shouldKeepLeft(data(Assigned), data(Free)));
+ assertTrue(strategy.shouldKeepLeft(data(Assigned), data(Assigned)));
+ assertTrue(strategy.shouldKeepLeft(data(Assigned, "dst2"), data(Owned,
dst)));
+ assertTrue(strategy.shouldKeepLeft(data(Assigned, "src1", dst),
data(Owned, "src2", dst)));
+ assertFalse(strategy.shouldKeepLeft(data(Assigned), data(Owned)));
+ assertTrue(strategy.shouldKeepLeft(data(Assigned, "dst2"),
data(Released, dst)));
+ assertTrue(strategy.shouldKeepLeft(data(Assigned, "src1", dst),
data(Released, "src2", dst)));
+ assertFalse(strategy.shouldKeepLeft(data(Assigned, dst),
data(Released, dst)));
+ assertTrue(strategy.shouldKeepLeft(data(Assigned), data(Splitting)));
+
+ assertFalse(strategy.shouldKeepLeft(data(Owned), data(Free)));
+ assertTrue(strategy.shouldKeepLeft(data(Owned), data(Assigned)));
+ assertTrue(strategy.shouldKeepLeft(data(Owned), data(Assigned, "")));
+ assertTrue(strategy.shouldKeepLeft(data(Owned), data(Assigned, "src",
dst)));
+ assertFalse(strategy.shouldKeepLeft(data(Owned), data(Assigned, dst)));
+ assertTrue(strategy.shouldKeepLeft(data(Owned), data(Owned)));
+ assertTrue(strategy.shouldKeepLeft(data(Owned), data(Released)));
+ assertTrue(strategy.shouldKeepLeft(data(Owned,"dst2"), data(Splitting,
dst)));
+ assertFalse(strategy.shouldKeepLeft(data(Owned), data(Splitting)));
+
+ assertFalse(strategy.shouldKeepLeft(data(Released), data(Free)));
+ assertTrue(strategy.shouldKeepLeft(data(Released), data(Assigned)));
+ assertTrue(strategy.shouldKeepLeft(data(Released, "dst2"), data(Owned,
dst)));
+ assertTrue(strategy.shouldKeepLeft(data(Released, "src1", dst),
data(Owned, "src2", dst)));
+ assertFalse(strategy.shouldKeepLeft(data(Released), data(Owned)));
+ assertTrue(strategy.shouldKeepLeft(data(Released), data(Released)));
+ assertTrue(strategy.shouldKeepLeft(data(Released), data(Splitting)));
+
+ assertFalse(strategy.shouldKeepLeft(data(Splitting), data(Free)));
+ assertTrue(strategy.shouldKeepLeft(data(Splitting), data(Assigned)));
+ assertTrue(strategy.shouldKeepLeft(data(Splitting), data(Owned)));
+ assertTrue(strategy.shouldKeepLeft(data(Splitting), data(Released)));
+ assertTrue(strategy.shouldKeepLeft(data(Splitting), data(Splitting)));
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTest.java
index 304d1df29c9..69e6a2d204c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTest.java
@@ -36,8 +36,8 @@ public class ServiceUnitStateTest {
assertFalse(ServiceUnitState.isValidTransition(Free, Free));
assertTrue(ServiceUnitState.isValidTransition(Free, Assigned));
assertTrue(ServiceUnitState.isValidTransition(Free, Owned));
- assertFalse(ServiceUnitState.isValidTransition(Free, Released));
- assertFalse(ServiceUnitState.isValidTransition(Free, Splitting));
+ assertTrue(ServiceUnitState.isValidTransition(Free, Released));
+ assertTrue(ServiceUnitState.isValidTransition(Free, Splitting));
assertTrue(ServiceUnitState.isValidTransition(Assigned, Free));
assertFalse(ServiceUnitState.isValidTransition(Assigned, Assigned));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
new file mode 100644
index 00000000000..41eaa640d28
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
@@ -0,0 +1,831 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Free;
+import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned;
+import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigned;
+import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Splitting;
+import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.isValidTransition;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.commons.lang.reflect.FieldUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateCompactionStrategy;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
+
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-compaction")
+public class ServiceUnitStateCompactionTest extends
MockedPulsarServiceBaseTest {
+ private ScheduledExecutorService compactionScheduler;
+ private BookKeeper bk;
+ private Schema<ServiceUnitStateData> schema;
+ private ServiceUnitStateCompactionStrategy strategy;
+
+ private ServiceUnitState testState0 = Free;
+ private ServiceUnitState testState1 = Free;
+ private ServiceUnitState testState2 = Free;
+ private ServiceUnitState testState3 = Free;
+ private ServiceUnitState testState4 = Free;
+
+ private static Random RANDOM = new Random();
+
+
+ private ServiceUnitStateData testValue(ServiceUnitState state, String
broker) {
+ if (state == Free) {
+ return null;
+ }
+ return new ServiceUnitStateData(state, broker);
+ }
+
+ private ServiceUnitStateData testValue0(String broker) {
+ ServiceUnitState to = nextValidState(testState0);
+ testState0 = to;
+ return testValue(to, broker);
+ }
+
+ private ServiceUnitStateData testValue1(String broker) {
+ ServiceUnitState to = nextValidState(testState1);
+ testState1 = to;
+ return testValue(to, broker);
+ }
+
+ private ServiceUnitStateData testValue2(String broker) {
+ ServiceUnitState to = nextValidState(testState2);
+ testState2 = to;
+ return testValue(to, broker);
+ }
+
+ private ServiceUnitStateData testValue3(String broker) {
+ ServiceUnitState to = nextValidState(testState3);
+ testState3 = to;
+ return testValue(to, broker);
+ }
+
+ private ServiceUnitStateData testValue4(String broker) {
+ ServiceUnitState to = nextValidState(testState4);
+ testState4 = to;
+ return testValue(to, broker);
+ }
+
+ private ServiceUnitState nextValidState(ServiceUnitState from) {
+ List<ServiceUnitState> candidates =
Arrays.stream(ServiceUnitState.values())
+ .filter(to -> to != Free && to != Splitting &&
isValidTransition(from, to))
+ .collect(Collectors.toList());
+ var state= candidates.get(RANDOM.nextInt(candidates.size()));
+ return state;
+ }
+
+ private ServiceUnitState nextInvalidState(ServiceUnitState from) {
+ List<ServiceUnitState> candidates =
Arrays.stream(ServiceUnitState.values())
+ .filter(to -> !isValidTransition(from, to))
+ .collect(Collectors.toList());
+ if (candidates.size() == 0) {
+ return null;
+ }
+ return candidates.get(RANDOM.nextInt(candidates.size()));
+ }
+
+ private List<ServiceUnitState> nextStatesToNull(ServiceUnitState from) {
+ if (from == null) {
+ return List.of();
+ }
+ return switch (from) {
+ case Assigned -> List.of(Owned);
+ case Owned -> List.of();
+ case Splitting -> List.of();
+ default -> List.of();
+ };
+ }
+
+ @BeforeMethod
+ @Override
+ public void setup() throws Exception {
+ super.internalSetup();
+
+ admin.clusters().createCluster("use",
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+ admin.tenants().createTenant("my-property",
+ new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
Sets.newHashSet("use")));
+ admin.namespaces().createNamespace("my-property/use/my-ns");
+
+ compactionScheduler = Executors.newSingleThreadScheduledExecutor(
+ new
ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
+ bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null,
Optional.empty(), null);
+ schema = Schema.JSON(ServiceUnitStateData.class);
+ strategy = new ServiceUnitStateCompactionStrategy();
+ strategy.checkBrokers(false);
+
+ }
+
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ public void cleanup() throws Exception {
+ super.internalCleanup();
+
+ if (compactionScheduler != null) {
+ compactionScheduler.shutdownNow();
+ }
+ }
+
+
+ public record TestData(
+ String topic,
+ Map<String, ServiceUnitStateData> expected,
+ List<Pair<String, ServiceUnitStateData>> all) {
+
+ }
+ TestData generateTestData() throws PulsarAdminException,
PulsarClientException {
+ String topic = "persistent://my-property/use/my-ns/my-topic1";
+ final int numMessages = 20;
+ final int maxKeys = 5;
+
+ // Configure retention to ensue data is retained for reader
+ admin.namespaces().setRetention("my-property/use/my-ns", new
RetentionPolicies(-1, -1));
+
+ Producer<ServiceUnitStateData> producer =
pulsarClient.newProducer(schema)
+ .topic(topic)
+ .enableBatching(true)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+
+ Map<String, ServiceUnitStateData> expected = new HashMap<>();
+ List<Pair<String, ServiceUnitStateData>> all = new ArrayList<>();
+ Random r = new Random(0);
+
+ pulsarClient.newConsumer(schema)
+ .topic(topic)
+ .subscriptionName("sub1")
+ .readCompacted(true)
+ .subscribe().close();
+
+ for (int j = 0; j < numMessages; j++) {
+ int keyIndex = r.nextInt(maxKeys);
+ String key = "key" + keyIndex;
+ ServiceUnitStateData prev = expected.get(key);
+ ServiceUnitState prevState = prev == null ? Free : prev.state();
+ ServiceUnitState state = r.nextBoolean() ?
nextInvalidState(prevState) :
+ nextValidState(prevState);
+ ServiceUnitStateData value = new ServiceUnitStateData(state, key +
":" + j);
+ producer.newMessage().key(key).value(value).send();
+ if (!strategy.shouldKeepLeft(prev, value)) {
+ expected.put(key, value);
+ }
+ all.add(Pair.of(key, value));
+ }
+ return new TestData(topic, expected, all);
+ }
+
+ @Test
+ public void testCompaction() throws Exception {
+ TestData testData = generateTestData();
+ var topic = testData.topic;
+ var expected = testData.expected;
+ var all = testData.all;
+
+ StrategicTwoPhaseCompactor compactor
+ = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
+ compactor.compact(topic, strategy).get();
+
+ PersistentTopicInternalStats internalStats =
admin.topics().getInternalStats(topic, false);
+ // Compacted topic ledger should have same number of entry equals to
number of unique key.
+ //Assert.assertEquals(internalStats.compactedLedger.entries,
expected.size());
+ Assert.assertTrue(internalStats.compactedLedger.ledgerId > -1);
+ Assert.assertFalse(internalStats.compactedLedger.offloaded);
+
+ // consumer with readCompacted enabled only get compacted entries
+ try (Consumer<ServiceUnitStateData> consumer =
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
+ .readCompacted(true).subscribe()) {
+ while (true) {
+ Message<ServiceUnitStateData> m = consumer.receive(2,
TimeUnit.SECONDS);
+ Assert.assertEquals(expected.remove(m.getKey()), m.getValue());
+ if (expected.isEmpty()) {
+ break;
+ }
+ }
+ Assert.assertTrue(expected.isEmpty());
+ }
+
+ // can get full backlog if read compacted disabled
+ try (Consumer<ServiceUnitStateData> consumer =
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
+ .readCompacted(false).subscribe()) {
+ while (true) {
+ Message<ServiceUnitStateData> m = consumer.receive(2,
TimeUnit.SECONDS);
+ Pair<String, ServiceUnitStateData> expectedMessage =
all.remove(0);
+ Assert.assertEquals(expectedMessage.getLeft(), m.getKey());
+ Assert.assertEquals(expectedMessage.getRight(), m.getValue());
+ if (all.isEmpty()) {
+ break;
+ }
+ }
+ Assert.assertTrue(all.isEmpty());
+ }
+ }
+
+ @Test
+ public void testCompactionWithReader() throws Exception {
+ TestData testData = generateTestData();
+ var topic = testData.topic;
+ var expected = testData.expected;
+ var all = testData.all;
+
+ StrategicTwoPhaseCompactor compactor
+ = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
+ compactor.compact(topic, strategy).get();
+
+ // consumer with readCompacted enabled only get compacted entries
+ try (Reader<ServiceUnitStateData> reader =
pulsarClient.newReader(schema).topic(topic).readCompacted(true)
+ .startMessageId(MessageId.earliest).create()) {
+ while (true) {
+ Message<ServiceUnitStateData> m = reader.readNext(2,
TimeUnit.SECONDS);
+ Assert.assertEquals(expected.remove(m.getKey()), m.getValue());
+ if (expected.isEmpty()) {
+ break;
+ }
+ }
+ Assert.assertTrue(expected.isEmpty());
+ }
+
+ // can get full backlog if read compacted disabled
+ try (Reader<ServiceUnitStateData> reader =
pulsarClient.newReader(schema).topic(topic).readCompacted(false)
+ .startMessageId(MessageId.earliest).create()) {
+ while (true) {
+ Message<ServiceUnitStateData> m = reader.readNext(2,
TimeUnit.SECONDS);
+ Pair<String, ServiceUnitStateData> expectedMessage =
all.remove(0);
+ Assert.assertEquals(expectedMessage.getLeft(), m.getKey());
+ Assert.assertEquals(expectedMessage.getRight(), m.getValue());
+ if (all.isEmpty()) {
+ break;
+ }
+ }
+ Assert.assertTrue(all.isEmpty());
+ }
+ }
+
+
+ @Test
+ public void testCompactionWithTableview() throws Exception {
+ var tv = pulsar.getClient().newTableViewBuilder(schema)
+ .topic("persistent://my-property/use/my-ns/my-topic1")
+ .loadConf(Map.of(
+ "topicCompactionStrategyClassName",
+ ServiceUnitStateCompactionStrategy.class.getName()))
+ .create();
+
+ ((ServiceUnitStateCompactionStrategy)
+ FieldUtils.readDeclaredField(tv, "compactionStrategy", true))
+ .checkBrokers(false);
+ TestData testData = generateTestData();
+ var topic = testData.topic;
+ var expected = testData.expected;
+ var expectedCopy = new HashMap<>(expected);
+
+ Awaitility.await()
+ .pollInterval(200, TimeUnit.MILLISECONDS)
+ .atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertEquals(expectedCopy.size(),
tv.size()));
+
+ for(var etr : tv.entrySet()){
+ Assert.assertEquals(expectedCopy.remove(etr.getKey()),
etr.getValue());
+ if (expectedCopy.isEmpty()) {
+ break;
+ }
+ }
+
+ Assert.assertTrue(expectedCopy.isEmpty());
+ tv.close();;
+
+ StrategicTwoPhaseCompactor compactor
+ = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
+ compactor.compact(topic, strategy).get();
+
+ // consumer with readCompacted enabled only get compacted entries
+ var tableview = pulsar.getClient().newTableViewBuilder(schema)
+ .topic(topic)
+ .loadConf(Map.of(
+ "topicCompactionStrategyClassName",
+ ServiceUnitStateCompactionStrategy.class.getName()))
+ .create();
+ for(var etr : tableview.entrySet()){
+ Assert.assertEquals(expected.remove(etr.getKey()), etr.getValue());
+ if (expected.isEmpty()) {
+ break;
+ }
+ }
+ Assert.assertTrue(expected.isEmpty());
+ tableview.close();
+
+ }
+
+
+ @Test
+ public void testReadCompactedBeforeCompaction() throws Exception {
+ String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+ Producer<ServiceUnitStateData> producer =
pulsarClient.newProducer(schema)
+ .topic(topic)
+ .enableBatching(true)
+ .create();
+
+
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
+
+ producer.newMessage().key("key0").value(testValue0(
"content0")).send();
+ producer.newMessage().key("key0").value(testValue0("content1")).send();
+ producer.newMessage().key("key0").value(testValue0(
"content2")).send();
+
+ try (Consumer<ServiceUnitStateData> consumer =
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
+ .readCompacted(true).subscribe()) {
+ Message<ServiceUnitStateData> m = consumer.receive();
+ Assert.assertEquals(m.getKey(), "key0");
+ Assert.assertEquals(m.getValue().broker(), "content0");
+
+ m = consumer.receive();
+ Assert.assertEquals(m.getKey(), "key0");
+ Assert.assertEquals(m.getValue().broker(), "content1");
+
+ m = consumer.receive();
+ Assert.assertEquals(m.getKey(), "key0");
+ Assert.assertEquals(m.getValue().broker(), "content2");
+ }
+
+ StrategicTwoPhaseCompactor compactor
+ = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
+ compactor.compact(topic, strategy).get();
+
+ try (Consumer<ServiceUnitStateData> consumer =
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
+ .readCompacted(true).subscribe()) {
+ Message<ServiceUnitStateData> m = consumer.receive();
+ Assert.assertEquals(m.getKey(), "key0");
+ Assert.assertEquals(m.getValue().broker(), "content2");
+ }
+ }
+
+ @Test
+ public void testReadEntriesAfterCompaction() throws Exception {
+ String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+ Producer<ServiceUnitStateData> producer =
pulsarClient.newProducer(schema)
+ .topic(topic)
+ .enableBatching(true)
+ .create();
+
+
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
+
+ producer.newMessage().key("key0").value(testValue0(
"content0")).send();
+ producer.newMessage().key("key0").value(testValue0("content1")).send();
+ producer.newMessage().key("key0").value(testValue0(
"content2")).send();
+
+ StrategicTwoPhaseCompactor compactor
+ = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
+ compactor.compact(topic, strategy).get();
+
+ producer.newMessage().key("key0").value(testValue0("content3")).send();
+
+ try (Consumer<ServiceUnitStateData> consumer =
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
+ .readCompacted(true).subscribe()) {
+ Message<ServiceUnitStateData> m = consumer.receive();
+ Assert.assertEquals(m.getKey(), "key0");
+ Assert.assertEquals(m.getValue().broker(), "content2");
+
+ m = consumer.receive();
+ Assert.assertEquals(m.getKey(), "key0");
+ Assert.assertEquals(m.getValue().broker(), "content3");
+ }
+ }
+
+ @Test
+ public void testSeekEarliestAfterCompaction() throws Exception {
+ String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+ Producer<ServiceUnitStateData> producer =
pulsarClient.newProducer(schema)
+ .topic(topic)
+ .enableBatching(true)
+ .create();
+
+ producer.newMessage().key("key0").value(testValue0(
"content0")).send();
+ producer.newMessage().key("key0").value(testValue0("content1")).send();
+ producer.newMessage().key("key0").value(testValue0(
"content2")).send();
+
+ StrategicTwoPhaseCompactor compactor
+ = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
+ compactor.compact(topic, strategy).get();
+
+ try (Consumer<ServiceUnitStateData> consumer =
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
+ .readCompacted(true).subscribe()) {
+ consumer.seek(MessageId.earliest);
+ Message<ServiceUnitStateData> m = consumer.receive();
+ Assert.assertEquals(m.getKey(), "key0");
+ Assert.assertEquals(m.getValue().broker(), "content2");
+ }
+
+ try (Consumer<ServiceUnitStateData> consumer =
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
+ .readCompacted(false).subscribe()) {
+ consumer.seek(MessageId.earliest);
+
+ Message<ServiceUnitStateData> m = consumer.receive();
+ Assert.assertEquals(m.getKey(), "key0");
+ Assert.assertEquals(m.getValue().broker(), "content0");
+
+ m = consumer.receive();
+ Assert.assertEquals(m.getKey(), "key0");
+ Assert.assertEquals(m.getValue().broker(), "content1");
+
+ m = consumer.receive();
+ Assert.assertEquals(m.getKey(), "key0");
+ Assert.assertEquals(m.getValue().broker(), "content2");
+ }
+ }
+
+ @Test
+ public void testBrokerRestartAfterCompaction() throws Exception {
+ String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+ Producer<ServiceUnitStateData> producer =
pulsarClient.newProducer(schema)
+ .topic(topic)
+ .enableBatching(true)
+ .create();
+
+
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
+
+ producer.newMessage().key("key0").value(testValue0(
"content0")).send();
+ producer.newMessage().key("key0").value(testValue0("content1")).send();
+ producer.newMessage().key("key0").value(testValue0(
"content2")).send();
+
+ StrategicTwoPhaseCompactor compactor
+ = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
+ compactor.compact(topic, strategy).get();
+
+ try (Consumer<ServiceUnitStateData> consumer =
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
+ .readCompacted(true).subscribe()) {
+ Message<ServiceUnitStateData> m = consumer.receive();
+ Assert.assertEquals(m.getKey(), "key0");
+ Assert.assertEquals(m.getValue().broker(), "content2");
+ }
+
+ stopBroker();
+ try (Consumer<ServiceUnitStateData> consumer =
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
+ .readCompacted(true).subscribe()) {
+ consumer.receive();
+ Assert.fail("Shouldn't have been able to receive anything");
+ } catch (PulsarClientException e) {
+ // correct behaviour
+ }
+ startBroker();
+
+ try (Consumer<ServiceUnitStateData> consumer =
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
+ .readCompacted(true).subscribe()) {
+ Message<ServiceUnitStateData> m = consumer.receive();
+ Assert.assertEquals(m.getKey(), "key0");
+ Assert.assertEquals(m.getValue().broker(), "content2");
+ }
+ }
+
+ @Test
+ public void testCompactEmptyTopic() throws Exception {
+ String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+ Producer<ServiceUnitStateData> producer =
pulsarClient.newProducer(schema)
+ .topic(topic)
+ .enableBatching(true)
+ .create();
+
+
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
+
+ StrategicTwoPhaseCompactor compactor
+ = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
+ compactor.compact(topic, strategy).get();
+
+ producer.newMessage().key("key0").value(testValue0(
"content0")).send();
+
+ try (Consumer<ServiceUnitStateData> consumer =
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
+ .readCompacted(true).subscribe()) {
+ Message<ServiceUnitStateData> m = consumer.receive();
+ Assert.assertEquals(m.getKey(), "key0");
+ Assert.assertEquals(m.getValue().broker(), "content0");
+ }
+ }
+
+ @Test
+ public void testWholeBatchCompactedOut() throws Exception {
+ String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+ // subscribe before sending anything, so that we get all messages
+ pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
+ .readCompacted(true).subscribe().close();
+
+ try (Producer<ServiceUnitStateData> producerNormal =
pulsarClient.newProducer(schema).topic(topic)
+ .enableBatching(true)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+ Producer<ServiceUnitStateData> producerBatch =
pulsarClient.newProducer(schema).topic(topic)
+ .maxPendingMessages(3)
+ .enableBatching(true)
+ .batchingMaxMessages(3)
+ .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create()) {
+
producerBatch.newMessage().key("key1").value(testValue1("my-message-1")).sendAsync();
+ producerBatch.newMessage().key("key1").value(testValue1(
"my-message-2")).sendAsync();
+
producerBatch.newMessage().key("key1").value(testValue1("my-message-3")).sendAsync();
+ producerNormal.newMessage().key("key1").value(testValue1(
"my-message-4")).send();
+ }
+
+ // compact the topic
+ StrategicTwoPhaseCompactor compactor
+ = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
+ compactor.compact(topic, strategy).get();
+
+ try (Consumer<ServiceUnitStateData> consumer =
pulsarClient.newConsumer(schema).topic(topic)
+ .subscriptionName("sub1").readCompacted(true).subscribe()) {
+ Message<ServiceUnitStateData> message = consumer.receive();
+ Assert.assertEquals(message.getKey(), "key1");
+ Assert.assertEquals(new String(message.getValue().broker()),
"my-message-4");
+ }
+ }
+
+ public void testCompactionWithLastDeletedKey() throws Exception {
+ String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+ Producer<ServiceUnitStateData> producer =
pulsarClient.newProducer(schema).topic(topic).enableBatching(true)
+
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
+
+
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
+
+ producer.newMessage().key("1").value(testValue(Owned, "1")).send();
+ producer.newMessage().key("2").value(testValue(Owned, "3")).send();
+ producer.newMessage().key("3").value(testValue(Owned, "5")).send();
+ producer.newMessage().key("1").value(null).send();
+ producer.newMessage().key("2").value(null).send();
+
+ StrategicTwoPhaseCompactor compactor
+ = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
+ compactor.compact(topic, strategy).get();
+
+ Set<String> expected = Sets.newHashSet("3");
+ // consumer with readCompacted enabled only get compacted entries
+ try (Consumer<ServiceUnitStateData> consumer =
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
+ .readCompacted(true).subscribe()) {
+ Message<ServiceUnitStateData> m = consumer.receive(2,
TimeUnit.SECONDS);
+ assertTrue(expected.remove(m.getKey()));
+ }
+ }
+
+ @Test(timeOut = 20000)
+ public void testEmptyCompactionLedger() throws Exception {
+ String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+ Producer<ServiceUnitStateData> producer =
pulsarClient.newProducer(schema).topic(topic).enableBatching(true)
+
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
+
+
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
+
+ producer.newMessage().key("1").value(testValue(Owned, "1")).send();
+ producer.newMessage().key("2").value(testValue(Owned, "3")).send();
+ producer.newMessage().key("1").value(null).send();
+ producer.newMessage().key("2").value(null).send();
+
+ StrategicTwoPhaseCompactor compactor
+ = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
+ compactor.compact(topic, strategy).get();
+
+ // consumer with readCompacted enabled only get compacted entries
+ try (Consumer<ServiceUnitStateData> consumer =
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
+ .readCompacted(true).subscribe()) {
+ Message<ServiceUnitStateData> m = consumer.receive(2,
TimeUnit.SECONDS);
+ assertNull(m);
+ }
+ }
+
+ @Test(timeOut = 20000)
+ public void testAllEmptyCompactionLedger() throws Exception {
+ final String topic =
+
"persistent://my-property/use/my-ns/testAllEmptyCompactionLedger" +
UUID.randomUUID().toString();
+
+ final int messages = 10;
+
+ // 1.create producer and publish message to the topic.
+ ProducerBuilder<ServiceUnitStateData> builder =
pulsarClient.newProducer(schema).topic(topic);
+ builder.batchingMaxMessages(messages / 5);
+
+ Producer<ServiceUnitStateData> producer = builder.create();
+
+ List<CompletableFuture<MessageId>> futures = new ArrayList<>(messages);
+ for (int i = 0; i < messages; i++) {
+
futures.add(producer.newMessage().key("1").value(null).sendAsync());
+ }
+
+ FutureUtil.waitForAll(futures).get();
+
+ // 2.compact the topic.
+ StrategicTwoPhaseCompactor compactor
+ = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
+ compactor.compact(topic, strategy).get();
+
+ // consumer with readCompacted enabled only get compacted entries
+ try (Consumer<ServiceUnitStateData> consumer =
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
+
.readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe())
{
+ Message<ServiceUnitStateData> m = consumer.receive(2,
TimeUnit.SECONDS);
+ assertNull(m);
+ }
+ }
+
+ @Test(timeOut = 20000)
+ public void testCompactMultipleTimesWithoutEmptyMessage()
+ throws PulsarClientException, ExecutionException,
InterruptedException {
+ final String topic =
+
"persistent://my-property/use/my-ns/testCompactMultipleTimesWithoutEmptyMessage"
+ UUID.randomUUID()
+ .toString();
+
+ final int messages = 10;
+ final String key = "1";
+
+ // 1.create producer and publish message to the topic.
+ ProducerBuilder<ServiceUnitStateData> builder =
pulsarClient.newProducer(schema).topic(topic);
+ builder.enableBatching(true);
+
+
+ Producer<ServiceUnitStateData> producer = builder.create();
+
+ List<CompletableFuture<MessageId>> futures = new ArrayList<>(messages);
+ for (int i = 0; i < messages; i++) {
+ futures.add(producer.newMessage().key(key).value(testValue0((i +
""))).sendAsync());
+ }
+
+ FutureUtil.waitForAll(futures).get();
+
+ // 2.compact the topic.
+ StrategicTwoPhaseCompactor compactor
+ = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
+ compactor.compact(topic, strategy).get();
+
+ // 3. Send more ten messages
+ futures.clear();
+ for (int i = 0; i < messages; i++) {
+ futures.add(producer.newMessage().key(key).value(testValue0((i +
10 + ""))).sendAsync());
+ }
+ FutureUtil.waitForAll(futures).get();
+
+ // 4.compact again.
+ compactor.compact(topic, strategy).get();
+
+ try (Consumer<ServiceUnitStateData> consumer =
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
+
.readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe())
{
+ Message<ServiceUnitStateData> m1 = consumer.receive();
+ assertNotNull(m1);
+ assertEquals(m1.getKey(), key);
+ assertEquals(m1.getValue().broker(), "19");
+ Message<ServiceUnitStateData> none = consumer.receive(2,
TimeUnit.SECONDS);
+ assertNull(none);
+ }
+ }
+
+ @Test(timeOut = 200000)
+ public void testReadUnCompacted()
+ throws PulsarClientException, ExecutionException,
InterruptedException {
+ final String topic =
"persistent://my-property/use/my-ns/testReadUnCompacted" +
UUID.randomUUID().toString();
+
+ final int messages = 10;
+ final String key = "1";
+
+ // 1.create producer and publish message to the topic.
+ ProducerBuilder<ServiceUnitStateData> builder =
pulsarClient.newProducer(schema).topic(topic);
+ builder.batchingMaxMessages(messages / 5);
+
+ Producer<ServiceUnitStateData> producer = builder.create();
+
+ List<CompletableFuture<MessageId>> futures = new ArrayList<>(messages);
+ for (int i = 0; i < messages; i++) {
+ futures.add(producer.newMessage().key(key).value(testValue0((i +
""))).sendAsync());
+ }
+
+ FutureUtil.waitForAll(futures).get();
+
+ // 2.compact the topic.
+ StrategicTwoPhaseCompactor compactor
+ = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
+ compactor.compact(topic, strategy).get();
+
+ // 3. Send more ten messages
+ futures.clear();
+ for (int i = 0; i < messages; i++) {
+ futures.add(producer.newMessage().key(key).value(testValue0((i +
10 + ""))).sendAsync());
+ }
+ FutureUtil.waitForAll(futures).get();
+ try (Consumer<ServiceUnitStateData> consumer =
pulsarClient.newConsumer(schema)
+ .topic(topic)
+ .subscriptionName("sub1")
+ .readCompacted(true)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe()) {
+ for (int i = 0; i < 11; i++) {
+ Message<ServiceUnitStateData> received = consumer.receive();
+ assertNotNull(received);
+ assertEquals(received.getKey(), key);
+ assertEquals(received.getValue().broker(), i + 9 + "");
+ consumer.acknowledge(received);
+ }
+ Message<ServiceUnitStateData> none = consumer.receive(2,
TimeUnit.SECONDS);
+ assertNull(none);
+ }
+
+ // 4.Send empty message to delete the key-value in the compacted topic.
+ for (ServiceUnitState state : nextStatesToNull(testState0)) {
+ producer.newMessage().key(key).value(new
ServiceUnitStateData(state, "xx")).send();
+ }
+ producer.newMessage().key(key).value(null).send();
+
+ // 5.compact the topic.
+ compactor.compact(topic, strategy).get();
+
+ try (Consumer<ServiceUnitStateData> consumer =
pulsarClient.newConsumer(schema)
+ .topic(topic)
+ .subscriptionName("sub2")
+ .readCompacted(true)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe()) {
+ Message<ServiceUnitStateData> none = consumer.receive(2,
TimeUnit.SECONDS);
+ assertNull(none);
+ }
+
+ for (int i = 0; i < messages; i++) {
+ futures.add(producer.newMessage().key(key).value(testValue0((i +
20 + ""))).sendAsync());
+ }
+ FutureUtil.waitForAll(futures).get();
+
+ try (Consumer<ServiceUnitStateData> consumer =
pulsarClient.newConsumer(schema)
+ .topic(topic)
+ .subscriptionName("sub3")
+ .readCompacted(true)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe()) {
+ for (int i = 0; i < 10; i++) {
+ Message<ServiceUnitStateData> received = consumer.receive();
+ assertNotNull(received);
+ assertEquals(received.getKey(), key);
+ assertEquals(received.getValue().broker(), i + 20 + "");
+ consumer.acknowledge(received);
+ }
+ Message<ServiceUnitStateData> none = consumer.receive(2,
TimeUnit.SECONDS);
+ assertNull(none);
+ }
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
index 21792ef3893..81771126f76 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
@@ -188,9 +188,9 @@ public class TableViewImpl<T> implements TableView<T> {
cur);
}
- T prev = data.get(key);
boolean update = true;
if (compactionStrategy != null) {
+ T prev = data.get(key);
update = !compactionStrategy.shouldKeepLeft(prev, cur);
}