This is an automated email from the ASF dual-hosted git repository.
kwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8d6b931d0d1 [fix][broker] Added the skipped message handler for
ServiceUnitStateChannel (#20677)
8d6b931d0d1 is described below
commit 8d6b931d0d12681081750dacd370a584e87ee47f
Author: Heesung Sohn <[email protected]>
AuthorDate: Thu Jun 29 22:26:13 2023 -0700
[fix][broker] Added the skipped message handler for ServiceUnitStateChannel
(#20677)
### Motivation
When topic lookups race(and the first lookup returns too fast before the
second request gets deduped), the later lookup requests can timeout because the
skipped messages are ignored at the table view and do not notify the service
unit state channel.
ex:
m1: assign to b1 -> m2: owned by b1 -> m3: assign to b2 // m3 is skipped at
the tableview
When m3 is skipped, we better get the channel notified to return the lookup
request with the current owner(b1) instead of letting the request time out.
### Modifications
- Added the `handleSkippedMessage` and `setSkippedMsgHandler` in
`TopicCompactionStrategy`.
- `ServiceUnitStateChannel` registers
`ServiceUnitStateChannel.handleSkippedEvent` to
`ServiceUnitStateCompactionStrategy` by `setSkippedMsgHandler`.
- TableView calls `TopicCompactionStrategy.handleSkippedMessage` when
messages are skipped.
---
.../channel/ServiceUnitStateChannelImpl.java | 21 +++++++++++++++++++++
.../channel/ServiceUnitStateCompactionStrategy.java | 13 +++++++++++++
.../channel/ServiceUnitStateChannelTest.java | 6 +++---
.../compaction/TopicCompactionStrategyTest.java | 4 ++--
.../apache/pulsar/client/impl/TableViewImpl.java | 5 ++++-
.../common/topics/TopicCompactionStrategy.java | 21 ++++++++++++++++++---
6 files changed, 61 insertions(+), 9 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index fbf3534e7d4..70fb8f9ba6f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -42,6 +42,7 @@ import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUni
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Unstable;
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state;
import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
+import static
org.apache.pulsar.common.topics.TopicCompactionStrategy.TABLE_VIEW_TAG;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost;
import static
org.apache.pulsar.metadata.api.extended.SessionEvent.SessionReestablished;
import com.google.common.annotations.VisibleForTesting;
@@ -94,6 +95,7 @@ import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -321,6 +323,13 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
ServiceUnitStateCompactionStrategy.class.getName()))
.create();
tableview.listen((key, value) -> handle(key, value));
+ var strategy = (ServiceUnitStateCompactionStrategy)
TopicCompactionStrategy.getInstance(TABLE_VIEW_TAG);
+ if (strategy == null) {
+ String err = TABLE_VIEW_TAG + "tag TopicCompactionStrategy is
null.";
+ log.error(err);
+ throw new IllegalStateException(err);
+ }
+ strategy.setSkippedMsgHandler((key, value) ->
handleSkippedEvent(key));
if (debug) {
log.info("Successfully started the channel tableview.");
}
@@ -695,6 +704,18 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
}
+ private void handleSkippedEvent(String serviceUnit) {
+ var getOwnerRequest = getOwnerRequests.get(serviceUnit);
+ if (getOwnerRequest != null) {
+ var data = tableview.get(serviceUnit);
+ if (data.state() == Owned) {
+ getOwnerRequest.complete(data.dstBroker());
+ getOwnerRequests.remove(serviceUnit);
+ stateChangeListeners.notify(serviceUnit, data, null);
+ }
+ }
+ }
+
private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data)
{
var getOwnerRequest = getOwnerRequests.remove(serviceUnit);
if (getOwnerRequest != null) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
index 72b05b5cd62..6a98b79be81 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
@@ -22,6 +22,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state;
import com.google.common.annotations.VisibleForTesting;
+import java.util.function.BiConsumer;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.topics.TopicCompactionStrategy;
@@ -29,6 +30,7 @@ import
org.apache.pulsar.common.topics.TopicCompactionStrategy;
public class ServiceUnitStateCompactionStrategy implements
TopicCompactionStrategy<ServiceUnitStateData> {
private final Schema<ServiceUnitStateData> schema;
+ private BiConsumer<String, ServiceUnitStateData> skippedMsgHandler;
private boolean checkBrokers = true;
@@ -36,6 +38,17 @@ public class ServiceUnitStateCompactionStrategy implements
TopicCompactionStrate
schema = Schema.JSON(ServiceUnitStateData.class);
}
+ public void setSkippedMsgHandler(BiConsumer<String, ServiceUnitStateData>
skippedMsgHandler) {
+ this.skippedMsgHandler = skippedMsgHandler;
+ }
+
+ @Override
+ public void handleSkippedMessage(String key, ServiceUnitStateData cur) {
+ if (skippedMsgHandler != null) {
+ skippedMsgHandler.accept(key, cur);
+ }
+ }
+
@Override
public Schema<ServiceUnitStateData> getSchema() {
return schema;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 1263170bec4..d8b4f1d75d2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -53,6 +53,7 @@ import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.AssertJUnit.assertNotNull;
+import static org.testng.AssertJUnit.assertNull;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
@@ -199,7 +200,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
super.internalCleanup();
}
- @Test(priority = 0)
+ @Test(priority = -1)
public void channelOwnerTest() throws Exception {
var channelOwner1 = channel1.getChannelOwnerAsync().get(2,
TimeUnit.SECONDS).get();
var channelOwner2 = channel2.getChannelOwnerAsync().get(2,
TimeUnit.SECONDS).get();
@@ -947,8 +948,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
} catch (CompletionException e) {
ex = e;
}
- assertNotNull(ex);
- assertEquals(TimeoutException.class, ex.getCause().getClass());
+ assertNull(ex);
assertEquals(Optional.of(lookupServiceAddress1),
channel2.getOwnerAsync(bundle).get());
assertEquals(Optional.of(lookupServiceAddress1),
channel1.getOwnerAsync(bundle).get());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionStrategyTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionStrategyTest.java
index 0ecd09606ce..50d4cdc7b0a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionStrategyTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionStrategyTest.java
@@ -41,13 +41,13 @@ public class TopicCompactionStrategyTest {
@Test(expectedExceptions = IllegalArgumentException.class)
public void testLoadInvalidTopicCompactionStrategy() {
- TopicCompactionStrategy.load("uknown");
+ TopicCompactionStrategy.load("uknown", "uknown");
}
@Test
public void testNumericOrderCompactionStrategy() {
TopicCompactionStrategy<Integer> strategy =
-
TopicCompactionStrategy.load(NumericOrderCompactionStrategy.class.getCanonicalName());
+ TopicCompactionStrategy.load("numeric",
NumericOrderCompactionStrategy.class.getCanonicalName());
Assert.assertFalse(strategy.shouldKeepLeft(1, 2));
Assert.assertTrue(strategy.shouldKeepLeft(2, 1));
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
index 77aba7e48cb..560636f9462 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;
+import static
org.apache.pulsar.common.topics.TopicCompactionStrategy.TABLE_VIEW_TAG;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -65,7 +66,8 @@ public class TableViewImpl<T> implements TableView<T> {
this.immutableData = Collections.unmodifiableMap(data);
this.listeners = new ArrayList<>();
this.listenersMutex = new ReentrantLock();
- this.compactionStrategy =
TopicCompactionStrategy.load(conf.getTopicCompactionStrategyClassName());
+ this.compactionStrategy =
+ TopicCompactionStrategy.load(TABLE_VIEW_TAG,
conf.getTopicCompactionStrategyClassName());
ReaderBuilder<T> readerBuilder = client.newReader(schema)
.topic(conf.getTopicName())
.startMessageId(MessageId.earliest)
@@ -198,6 +200,7 @@ public class TableViewImpl<T> implements TableView<T> {
key,
cur,
prev);
+ compactionStrategy.handleSkippedMessage(key, cur);
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicCompactionStrategy.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicCompactionStrategy.java
index f06374b234f..51fb6742961 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicCompactionStrategy.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicCompactionStrategy.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.common.topics;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.pulsar.client.api.Schema;
/**
@@ -45,6 +47,9 @@ import org.apache.pulsar.client.api.Schema;
*/
public interface TopicCompactionStrategy<T> {
+ String TABLE_VIEW_TAG = "table-view";
+ Map<String, TopicCompactionStrategy> INSTANCES = new ConcurrentHashMap<>();
+
/**
* Returns the schema object for this strategy.
* @return
@@ -60,17 +65,27 @@ public interface TopicCompactionStrategy<T> {
*/
boolean shouldKeepLeft(T prev, T cur);
- static TopicCompactionStrategy load(String
topicCompactionStrategyClassName) {
+ default void handleSkippedMessage(String key, T cur) {
+ }
+
+
+ static TopicCompactionStrategy load(String tag, String
topicCompactionStrategyClassName) {
if (topicCompactionStrategyClassName == null) {
return null;
}
+
try {
Class<?> clazz = Class.forName(topicCompactionStrategyClassName);
- Object instance = clazz.getDeclaredConstructor().newInstance();
- return (TopicCompactionStrategy) instance;
+ TopicCompactionStrategy instance = (TopicCompactionStrategy)
clazz.getDeclaredConstructor().newInstance();
+ INSTANCES.put(tag, instance);
+ return instance;
} catch (Exception e) {
throw new IllegalArgumentException(
"Error when loading topic compaction strategy: " +
topicCompactionStrategyClassName, e);
}
}
+
+ static TopicCompactionStrategy getInstance(String tag) {
+ return INSTANCES.get(tag);
+ }
}