This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 06e4db5c821 [improve][broker] PIP-192: Implement load data store 
(#18777)
06e4db5c821 is described below

commit 06e4db5c821b2ba9241040f5db52f882e83e8cd8
Author: Kai Wang <[email protected]>
AuthorDate: Wed Feb 1 10:05:06 2023 +0800

    [improve][broker] PIP-192: Implement load data store (#18777)
---
 .../extensions/store/LoadDataStore.java            |  12 ++
 .../extensions/store/LoadDataStoreFactory.java     |  32 ++++++
 .../store/TableViewLoadDataStoreImpl.java          |  90 +++++++++++++++
 .../extensions/store/LoadDataStoreTest.java        | 127 +++++++++++++++++++++
 .../strategy/LeastResourceUsageWithWeightTest.java |  11 ++
 5 files changed, 272 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java
index 174e656167d..512811e1019 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java
@@ -40,6 +40,13 @@ public interface LoadDataStore<T> extends Closeable {
      */
     CompletableFuture<Void> pushAsync(String key, T loadData);
 
+    /**
+     * Async remove load data from store.
+     *
+     * @param key The load data key to remove.
+     */
+    CompletableFuture<Void> removeAsync(String key);
+
     /**
      * Get load data by key.
      *
@@ -62,4 +69,9 @@ public interface LoadDataStore<T> extends Closeable {
      */
     Set<Map.Entry<String, T>> entrySet();
 
+    /**
+     * The load data key count.
+     */
+    int size();
+
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreFactory.java
new file mode 100644
index 00000000000..18f39abd76b
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.store;
+
+import org.apache.pulsar.client.api.PulsarClient;
+
+/**
+ * The load data store factory, use to create the load data store.
+ */
+public class LoadDataStoreFactory {
+
+    public static <T> LoadDataStore<T> create(PulsarClient client, String 
name, Class<T> clazz)
+            throws LoadDataStoreException {
+        return new TableViewLoadDataStoreImpl<>(client, name, clazz);
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
new file mode 100644
index 00000000000..3909de2afa2
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.store;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TableView;
+
+/**
+ * The load data store, base on {@link TableView <T>}.
+ *
+ * @param <T> Load data type.
+ */
+public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> {
+
+    private final TableView<T> tableView;
+
+    private final Producer<T> producer;
+
+    public TableViewLoadDataStoreImpl(PulsarClient client, String topic, 
Class<T> clazz) throws LoadDataStoreException {
+        try {
+            this.tableView = 
client.newTableViewBuilder(Schema.JSON(clazz)).topic(topic).create();
+            this.producer = 
client.newProducer(Schema.JSON(clazz)).topic(topic).create();
+        } catch (Exception e) {
+            throw new LoadDataStoreException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> pushAsync(String key, T loadData) {
+        return 
producer.newMessage().key(key).value(loadData).sendAsync().thenAccept(__ -> {});
+    }
+
+    @Override
+    public CompletableFuture<Void> removeAsync(String key) {
+        return 
producer.newMessage().key(key).value(null).sendAsync().thenAccept(__ -> {});
+    }
+
+    @Override
+    public Optional<T> get(String key) {
+        return Optional.ofNullable(tableView.get(key));
+    }
+
+    @Override
+    public void forEach(BiConsumer<String, T> action) {
+        tableView.forEach(action);
+    }
+
+    public Set<Map.Entry<String, T>> entrySet() {
+        return tableView.entrySet();
+    }
+
+    @Override
+    public int size() {
+        return tableView.size();
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (producer != null) {
+            producer.close();
+        }
+        if (tableView != null) {
+            tableView.close();
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
new file mode 100644
index 00000000000..5e2924cd842
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.store;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.AssertJUnit.assertTrue;
+
+import com.google.common.collect.Sets;
+import lombok.AllArgsConstructor;
+import lombok.Cleanup;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+@Test(groups = "broker")
+public class LoadDataStoreTest extends MockedPulsarServiceBaseTest {
+
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    static class MyClass {
+        String a;
+        int b;
+    }
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        createDefaultTenantInfo();
+        
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
+                new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(configClusterName)));
+        
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
+    }
+
+    @AfterClass
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testPushGetAndRemove() throws Exception {
+
+        String topic = TopicDomain.persistent + "://" + 
NamespaceName.SYSTEM_NAMESPACE + "/" + UUID.randomUUID();
+
+        @Cleanup
+        LoadDataStore<MyClass> loadDataStore =
+                LoadDataStoreFactory.create(pulsar.getClient(), topic, 
MyClass.class);
+        MyClass myClass1 = new MyClass("1", 1);
+        loadDataStore.pushAsync("key1", myClass1).get();
+
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue(loadDataStore.get("key1").isPresent());
+            assertEquals(loadDataStore.get("key1").get(), myClass1);
+        });
+        assertEquals(loadDataStore.size(), 1);
+
+        MyClass myClass2 = new MyClass("2", 2);
+        loadDataStore.pushAsync("key2", myClass2).get();
+
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue(loadDataStore.get("key2").isPresent());
+            assertEquals(loadDataStore.get("key2").get(), myClass2);
+        });
+        assertEquals(loadDataStore.size(), 2);
+
+        loadDataStore.removeAsync("key2").get();
+        Awaitility.await().untilAsserted(() -> 
assertFalse(loadDataStore.get("key2").isPresent()));
+        assertEquals(loadDataStore.size(), 1);
+
+    }
+
+    @Test
+    public void testForEach() throws Exception {
+
+        String topic = TopicDomain.persistent + "://" + 
NamespaceName.SYSTEM_NAMESPACE + "/" + UUID.randomUUID();
+
+        @Cleanup
+        LoadDataStore<Integer> loadDataStore =
+                LoadDataStoreFactory.create(pulsar.getClient(), topic, 
Integer.class);
+
+        Map<String, Integer> map = new HashMap<>();
+        for (int i = 0; i < 10; i++) {
+            String key = "key-" + i;
+            Integer value = i;
+            loadDataStore.pushAsync(key, value).get();
+            map.put(key, value);
+        }
+        Awaitility.await().untilAsserted(() -> 
assertEquals(loadDataStore.size(), 10));
+
+        loadDataStore.forEach((key, value) -> {
+            assertTrue(loadDataStore.get(key).isPresent());
+            assertEquals(loadDataStore.get(key).get(), map.get(key));
+        });
+
+        assertEquals(loadDataStore.entrySet(), map.entrySet());
+    }
+
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
index db3d8f9304c..ef0e65762f1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
@@ -216,6 +216,12 @@ public class LeastResourceUsageWithWeightTest {
                 return null;
             }
 
+            @Override
+            public CompletableFuture<Void> removeAsync(String key) {
+                map.remove(key);
+                return CompletableFuture.completedFuture(null);
+            }
+
             @Override
             public Optional<BrokerLoadData> get(String key) {
                 var val = map.get(key);
@@ -234,6 +240,11 @@ public class LeastResourceUsageWithWeightTest {
             public Set<Map.Entry<String, BrokerLoadData>> entrySet() {
                 return map.entrySet();
             }
+
+            @Override
+            public int size() {
+                return map.size();
+            }
         };
 
         doReturn(conf).when(ctx).brokerConfiguration();

Reply via email to