This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 06e4db5c821 [improve][broker] PIP-192: Implement load data store
(#18777)
06e4db5c821 is described below
commit 06e4db5c821b2ba9241040f5db52f882e83e8cd8
Author: Kai Wang <[email protected]>
AuthorDate: Wed Feb 1 10:05:06 2023 +0800
[improve][broker] PIP-192: Implement load data store (#18777)
---
.../extensions/store/LoadDataStore.java | 12 ++
.../extensions/store/LoadDataStoreFactory.java | 32 ++++++
.../store/TableViewLoadDataStoreImpl.java | 90 +++++++++++++++
.../extensions/store/LoadDataStoreTest.java | 127 +++++++++++++++++++++
.../strategy/LeastResourceUsageWithWeightTest.java | 11 ++
5 files changed, 272 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java
index 174e656167d..512811e1019 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java
@@ -40,6 +40,13 @@ public interface LoadDataStore<T> extends Closeable {
*/
CompletableFuture<Void> pushAsync(String key, T loadData);
+ /**
+ * Async remove load data from store.
+ *
+ * @param key The load data key to remove.
+ */
+ CompletableFuture<Void> removeAsync(String key);
+
/**
* Get load data by key.
*
@@ -62,4 +69,9 @@ public interface LoadDataStore<T> extends Closeable {
*/
Set<Map.Entry<String, T>> entrySet();
+ /**
+ * The load data key count.
+ */
+ int size();
+
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreFactory.java
new file mode 100644
index 00000000000..18f39abd76b
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.store;
+
+import org.apache.pulsar.client.api.PulsarClient;
+
+/**
+ * The load data store factory, use to create the load data store.
+ */
+public class LoadDataStoreFactory {
+
+ public static <T> LoadDataStore<T> create(PulsarClient client, String
name, Class<T> clazz)
+ throws LoadDataStoreException {
+ return new TableViewLoadDataStoreImpl<>(client, name, clazz);
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
new file mode 100644
index 00000000000..3909de2afa2
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.store;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TableView;
+
+/**
+ * The load data store, base on {@link TableView <T>}.
+ *
+ * @param <T> Load data type.
+ */
+public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> {
+
+ private final TableView<T> tableView;
+
+ private final Producer<T> producer;
+
+ public TableViewLoadDataStoreImpl(PulsarClient client, String topic,
Class<T> clazz) throws LoadDataStoreException {
+ try {
+ this.tableView =
client.newTableViewBuilder(Schema.JSON(clazz)).topic(topic).create();
+ this.producer =
client.newProducer(Schema.JSON(clazz)).topic(topic).create();
+ } catch (Exception e) {
+ throw new LoadDataStoreException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> pushAsync(String key, T loadData) {
+ return
producer.newMessage().key(key).value(loadData).sendAsync().thenAccept(__ -> {});
+ }
+
+ @Override
+ public CompletableFuture<Void> removeAsync(String key) {
+ return
producer.newMessage().key(key).value(null).sendAsync().thenAccept(__ -> {});
+ }
+
+ @Override
+ public Optional<T> get(String key) {
+ return Optional.ofNullable(tableView.get(key));
+ }
+
+ @Override
+ public void forEach(BiConsumer<String, T> action) {
+ tableView.forEach(action);
+ }
+
+ public Set<Map.Entry<String, T>> entrySet() {
+ return tableView.entrySet();
+ }
+
+ @Override
+ public int size() {
+ return tableView.size();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (producer != null) {
+ producer.close();
+ }
+ if (tableView != null) {
+ tableView.close();
+ }
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
new file mode 100644
index 00000000000..5e2924cd842
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.store;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.AssertJUnit.assertTrue;
+
+import com.google.common.collect.Sets;
+import lombok.AllArgsConstructor;
+import lombok.Cleanup;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+@Test(groups = "broker")
+public class LoadDataStoreTest extends MockedPulsarServiceBaseTest {
+
+ @Data
+ @AllArgsConstructor
+ @NoArgsConstructor
+ static class MyClass {
+ String a;
+ int b;
+ }
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ createDefaultTenantInfo();
+
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
+ new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet(configClusterName)));
+
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
+ }
+
+ @AfterClass
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testPushGetAndRemove() throws Exception {
+
+ String topic = TopicDomain.persistent + "://" +
NamespaceName.SYSTEM_NAMESPACE + "/" + UUID.randomUUID();
+
+ @Cleanup
+ LoadDataStore<MyClass> loadDataStore =
+ LoadDataStoreFactory.create(pulsar.getClient(), topic,
MyClass.class);
+ MyClass myClass1 = new MyClass("1", 1);
+ loadDataStore.pushAsync("key1", myClass1).get();
+
+ Awaitility.await().untilAsserted(() -> {
+ assertTrue(loadDataStore.get("key1").isPresent());
+ assertEquals(loadDataStore.get("key1").get(), myClass1);
+ });
+ assertEquals(loadDataStore.size(), 1);
+
+ MyClass myClass2 = new MyClass("2", 2);
+ loadDataStore.pushAsync("key2", myClass2).get();
+
+ Awaitility.await().untilAsserted(() -> {
+ assertTrue(loadDataStore.get("key2").isPresent());
+ assertEquals(loadDataStore.get("key2").get(), myClass2);
+ });
+ assertEquals(loadDataStore.size(), 2);
+
+ loadDataStore.removeAsync("key2").get();
+ Awaitility.await().untilAsserted(() ->
assertFalse(loadDataStore.get("key2").isPresent()));
+ assertEquals(loadDataStore.size(), 1);
+
+ }
+
+ @Test
+ public void testForEach() throws Exception {
+
+ String topic = TopicDomain.persistent + "://" +
NamespaceName.SYSTEM_NAMESPACE + "/" + UUID.randomUUID();
+
+ @Cleanup
+ LoadDataStore<Integer> loadDataStore =
+ LoadDataStoreFactory.create(pulsar.getClient(), topic,
Integer.class);
+
+ Map<String, Integer> map = new HashMap<>();
+ for (int i = 0; i < 10; i++) {
+ String key = "key-" + i;
+ Integer value = i;
+ loadDataStore.pushAsync(key, value).get();
+ map.put(key, value);
+ }
+ Awaitility.await().untilAsserted(() ->
assertEquals(loadDataStore.size(), 10));
+
+ loadDataStore.forEach((key, value) -> {
+ assertTrue(loadDataStore.get(key).isPresent());
+ assertEquals(loadDataStore.get(key).get(), map.get(key));
+ });
+
+ assertEquals(loadDataStore.entrySet(), map.entrySet());
+ }
+
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
index db3d8f9304c..ef0e65762f1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
@@ -216,6 +216,12 @@ public class LeastResourceUsageWithWeightTest {
return null;
}
+ @Override
+ public CompletableFuture<Void> removeAsync(String key) {
+ map.remove(key);
+ return CompletableFuture.completedFuture(null);
+ }
+
@Override
public Optional<BrokerLoadData> get(String key) {
var val = map.get(key);
@@ -234,6 +240,11 @@ public class LeastResourceUsageWithWeightTest {
public Set<Map.Entry<String, BrokerLoadData>> entrySet() {
return map.entrySet();
}
+
+ @Override
+ public int size() {
+ return map.size();
+ }
};
doReturn(conf).when(ctx).brokerConfiguration();