This is an automated email from the ASF dual-hosted git repository.

kwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d6b931d0d1 [fix][broker] Added the skipped message handler for 
ServiceUnitStateChannel (#20677)
8d6b931d0d1 is described below

commit 8d6b931d0d12681081750dacd370a584e87ee47f
Author: Heesung Sohn <[email protected]>
AuthorDate: Thu Jun 29 22:26:13 2023 -0700

    [fix][broker] Added the skipped message handler for ServiceUnitStateChannel 
(#20677)
    
    ### Motivation
    
    When topic lookups race(and the first lookup returns too fast before the 
second request gets deduped), the later lookup requests can timeout because the 
skipped messages are ignored at the table view and do not notify the service 
unit state channel.
    
    ex:
    
    m1: assign to b1 -> m2: owned by b1 -> m3: assign to b2 // m3 is skipped at 
the tableview
    
    When m3 is skipped, we better get the channel notified to return the lookup 
request with the current owner(b1) instead of letting the request time out.
    
    ### Modifications
    
    - Added the `handleSkippedMessage` and `setSkippedMsgHandler` in 
`TopicCompactionStrategy`.
    - `ServiceUnitStateChannel` registers 
`ServiceUnitStateChannel.handleSkippedEvent` to 
`ServiceUnitStateCompactionStrategy` by `setSkippedMsgHandler`.
    - TableView calls `TopicCompactionStrategy.handleSkippedMessage` when 
messages are skipped.
---
 .../channel/ServiceUnitStateChannelImpl.java        | 21 +++++++++++++++++++++
 .../channel/ServiceUnitStateCompactionStrategy.java | 13 +++++++++++++
 .../channel/ServiceUnitStateChannelTest.java        |  6 +++---
 .../compaction/TopicCompactionStrategyTest.java     |  4 ++--
 .../apache/pulsar/client/impl/TableViewImpl.java    |  5 ++++-
 .../common/topics/TopicCompactionStrategy.java      | 21 ++++++++++++++++++---
 6 files changed, 61 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index fbf3534e7d4..70fb8f9ba6f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -42,6 +42,7 @@ import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUni
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Unstable;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state;
 import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
+import static 
org.apache.pulsar.common.topics.TopicCompactionStrategy.TABLE_VIEW_TAG;
 import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost;
 import static 
org.apache.pulsar.metadata.api.extended.SessionEvent.SessionReestablished;
 import com.google.common.annotations.VisibleForTesting;
@@ -94,6 +95,7 @@ import org.apache.pulsar.common.naming.NamespaceBundles;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -321,6 +323,13 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                             
ServiceUnitStateCompactionStrategy.class.getName()))
                     .create();
             tableview.listen((key, value) -> handle(key, value));
+            var strategy = (ServiceUnitStateCompactionStrategy) 
TopicCompactionStrategy.getInstance(TABLE_VIEW_TAG);
+            if (strategy == null) {
+                String err = TABLE_VIEW_TAG + "tag TopicCompactionStrategy is 
null.";
+                log.error(err);
+                throw new IllegalStateException(err);
+            }
+            strategy.setSkippedMsgHandler((key, value) -> 
handleSkippedEvent(key));
             if (debug) {
                 log.info("Successfully started the channel tableview.");
             }
@@ -695,6 +704,18 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         }
     }
 
+    private void handleSkippedEvent(String serviceUnit) {
+        var getOwnerRequest = getOwnerRequests.get(serviceUnit);
+        if (getOwnerRequest != null) {
+            var data = tableview.get(serviceUnit);
+            if (data.state() == Owned) {
+                getOwnerRequest.complete(data.dstBroker());
+                getOwnerRequests.remove(serviceUnit);
+                stateChangeListeners.notify(serviceUnit, data, null);
+            }
+        }
+    }
+
     private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) 
{
         var getOwnerRequest = getOwnerRequests.remove(serviceUnit);
         if (getOwnerRequest != null) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
index 72b05b5cd62..6a98b79be81 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
@@ -22,6 +22,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state;
 import com.google.common.annotations.VisibleForTesting;
+import java.util.function.BiConsumer;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.topics.TopicCompactionStrategy;
@@ -29,6 +30,7 @@ import 
org.apache.pulsar.common.topics.TopicCompactionStrategy;
 public class ServiceUnitStateCompactionStrategy implements 
TopicCompactionStrategy<ServiceUnitStateData> {
 
     private final Schema<ServiceUnitStateData> schema;
+    private BiConsumer<String, ServiceUnitStateData> skippedMsgHandler;
 
     private boolean checkBrokers = true;
 
@@ -36,6 +38,17 @@ public class ServiceUnitStateCompactionStrategy implements 
TopicCompactionStrate
         schema = Schema.JSON(ServiceUnitStateData.class);
     }
 
+    public void setSkippedMsgHandler(BiConsumer<String, ServiceUnitStateData> 
skippedMsgHandler) {
+        this.skippedMsgHandler = skippedMsgHandler;
+    }
+
+    @Override
+    public void handleSkippedMessage(String key, ServiceUnitStateData cur) {
+        if (skippedMsgHandler != null) {
+            skippedMsgHandler.accept(key, cur);
+        }
+    }
+
     @Override
     public Schema<ServiceUnitStateData> getSchema() {
         return schema;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 1263170bec4..d8b4f1d75d2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -53,6 +53,7 @@ import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.AssertJUnit.assertNotNull;
+import static org.testng.AssertJUnit.assertNull;
 import java.lang.reflect.Field;
 import java.util.List;
 import java.util.Map;
@@ -199,7 +200,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         super.internalCleanup();
     }
 
-    @Test(priority = 0)
+    @Test(priority = -1)
     public void channelOwnerTest() throws Exception {
         var channelOwner1 = channel1.getChannelOwnerAsync().get(2, 
TimeUnit.SECONDS).get();
         var channelOwner2 = channel2.getChannelOwnerAsync().get(2, 
TimeUnit.SECONDS).get();
@@ -947,8 +948,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         } catch (CompletionException e) {
             ex = e;
         }
-        assertNotNull(ex);
-        assertEquals(TimeoutException.class, ex.getCause().getClass());
+        assertNull(ex);
         assertEquals(Optional.of(lookupServiceAddress1), 
channel2.getOwnerAsync(bundle).get());
         assertEquals(Optional.of(lookupServiceAddress1), 
channel1.getOwnerAsync(bundle).get());
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionStrategyTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionStrategyTest.java
index 0ecd09606ce..50d4cdc7b0a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionStrategyTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionStrategyTest.java
@@ -41,13 +41,13 @@ public class TopicCompactionStrategyTest {
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testLoadInvalidTopicCompactionStrategy() {
-        TopicCompactionStrategy.load("uknown");
+        TopicCompactionStrategy.load("uknown", "uknown");
     }
 
     @Test
     public void testNumericOrderCompactionStrategy() {
         TopicCompactionStrategy<Integer> strategy =
-                
TopicCompactionStrategy.load(NumericOrderCompactionStrategy.class.getCanonicalName());
+                TopicCompactionStrategy.load("numeric", 
NumericOrderCompactionStrategy.class.getCanonicalName());
         Assert.assertFalse(strategy.shouldKeepLeft(1, 2));
         Assert.assertTrue(strategy.shouldKeepLeft(2, 1));
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
index 77aba7e48cb..560636f9462 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static 
org.apache.pulsar.common.topics.TopicCompactionStrategy.TABLE_VIEW_TAG;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -65,7 +66,8 @@ public class TableViewImpl<T> implements TableView<T> {
         this.immutableData = Collections.unmodifiableMap(data);
         this.listeners = new ArrayList<>();
         this.listenersMutex = new ReentrantLock();
-        this.compactionStrategy = 
TopicCompactionStrategy.load(conf.getTopicCompactionStrategyClassName());
+        this.compactionStrategy =
+                TopicCompactionStrategy.load(TABLE_VIEW_TAG, 
conf.getTopicCompactionStrategyClassName());
         ReaderBuilder<T> readerBuilder = client.newReader(schema)
                 .topic(conf.getTopicName())
                 .startMessageId(MessageId.earliest)
@@ -198,6 +200,7 @@ public class TableViewImpl<T> implements TableView<T> {
                                 key,
                                 cur,
                                 prev);
+                        compactionStrategy.handleSkippedMessage(key, cur);
                     }
                 }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicCompactionStrategy.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicCompactionStrategy.java
index f06374b234f..51fb6742961 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicCompactionStrategy.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicCompactionStrategy.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.common.topics;
 
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.pulsar.client.api.Schema;
 
 /**
@@ -45,6 +47,9 @@ import org.apache.pulsar.client.api.Schema;
  */
 public interface TopicCompactionStrategy<T> {
 
+    String TABLE_VIEW_TAG = "table-view";
+    Map<String, TopicCompactionStrategy> INSTANCES = new ConcurrentHashMap<>();
+
     /**
      * Returns the schema object for this strategy.
      * @return
@@ -60,17 +65,27 @@ public interface TopicCompactionStrategy<T> {
      */
     boolean shouldKeepLeft(T prev, T cur);
 
-    static TopicCompactionStrategy load(String 
topicCompactionStrategyClassName) {
+    default void handleSkippedMessage(String key, T cur) {
+    }
+
+
+    static TopicCompactionStrategy load(String tag, String 
topicCompactionStrategyClassName) {
         if (topicCompactionStrategyClassName == null) {
             return null;
         }
+
         try {
             Class<?> clazz = Class.forName(topicCompactionStrategyClassName);
-            Object instance = clazz.getDeclaredConstructor().newInstance();
-            return (TopicCompactionStrategy) instance;
+            TopicCompactionStrategy instance = (TopicCompactionStrategy) 
clazz.getDeclaredConstructor().newInstance();
+            INSTANCES.put(tag, instance);
+            return instance;
         } catch (Exception e) {
             throw new IllegalArgumentException(
                     "Error when loading topic compaction strategy: " + 
topicCompactionStrategyClassName, e);
         }
     }
+
+    static TopicCompactionStrategy getInstance(String tag) {
+        return INSTANCES.get(tag);
+    }
 }

Reply via email to