This is an automated email from the ASF dual-hosted git repository.
yong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1687244ff0e [improve][offload] Extend the offload policies to allow
specifying more conf (#20804)
1687244ff0e is described below
commit 1687244ff0e9d12dd0d864136f9b39623f0446bc
Author: Yong Zhang <[email protected]>
AuthorDate: Tue Jul 18 11:43:51 2023 +0800
[improve][offload] Extend the offload policies to allow specifying more
conf (#20804)
### Motivation
The offload policies have limited the configurations for the offloaders.
That means if the offloader needs more configurations, we need to extend more
fields in the OffloadPoliciesImpl. That doesn't make sense. We should make it
extendable easily. Add a configuration map support to allow it to set more
configurations.
---
.../common/policies/data/OffloadPoliciesImpl.java | 14 +++++++
.../org/apache/pulsar/common/util/FieldParser.java | 9 +++++
.../common/policies/data/OffloadPoliciesTest.java | 13 +++++++
.../apache/pulsar/common/util/FieldParserTest.java | 45 ++++++++++++++++++++++
4 files changed, 81 insertions(+)
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
index 843c1bde3b9..f9148ba8699 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
@@ -31,6 +31,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import lombok.Data;
@@ -118,6 +119,9 @@ public class OffloadPoliciesImpl implements Serializable,
OffloadPolicies {
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private OffloadedReadPriority managedLedgerOffloadedReadPriority =
DEFAULT_OFFLOADED_READ_PRIORITY;
+ @Configuration
+ @JsonProperty(access = JsonProperty.Access.READ_WRITE)
+ private Map<String, String> managedLedgerExtraConfigurations = null;
// s3 config, set by service configuration or cli
@Configuration
@@ -257,6 +261,14 @@ public class OffloadPoliciesImpl implements Serializable,
OffloadPolicies {
}
}
});
+ Map<String, String> extraConfigurations =
properties.entrySet().stream()
+ .filter(entry ->
entry.getKey().toString().startsWith("managedLedgerOffloadExtraConfig"))
+ .collect(Collectors.toMap(
+ entry ->
entry.getKey().toString().replaceFirst("managedLedgerOffloadExtraConfig", ""),
+ entry -> entry.getValue().toString()));
+
+ data.setManagedLedgerExtraConfigurations(extraConfigurations);
+
data.compatibleWithBrokerConfigFile(properties);
return data;
}
@@ -347,6 +359,8 @@ public class OffloadPoliciesImpl implements Serializable,
OffloadPolicies {
this.getManagedLedgerOffloadThresholdInSeconds());
setProperty(properties, "managedLedgerOffloadDeletionLagInMillis",
this.getManagedLedgerOffloadDeletionLagInMillis());
+ setProperty(properties, "managedLedgerOffloadExtraConfigurations",
+ this.getManagedLedgerExtraConfigurations());
if (this.isS3Driver()) {
setProperty(properties, "s3ManagedLedgerOffloadRegion",
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java
index 626a14b92ee..8d1ae5294ff 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java
@@ -314,6 +314,9 @@ public final class FieldParser {
* @return The converted list with type {@code <T>}.
*/
public static <T> List<T> stringToList(String val, Class<T> type) {
+ if (val == null) {
+ return null;
+ }
String[] tokens = trim(val).split(",");
return Arrays.stream(tokens).map(t -> {
return convert(trim(t), type);
@@ -330,6 +333,9 @@ public final class FieldParser {
* @return The converted set with type {@code <T>}.
*/
public static <T> Set<T> stringToSet(String val, Class<T> type) {
+ if (val == null) {
+ return null;
+ }
String[] tokens = trim(val).split(",");
return Arrays.stream(tokens).map(t -> {
return convert(trim(t), type);
@@ -337,6 +343,9 @@ public final class FieldParser {
}
private static <K, V> Map<K, V> stringToMap(String strValue, Class<K>
keyType, Class<V> valueType) {
+ if (strValue == null) {
+ return null;
+ }
String[] tokens = trim(strValue).split(",");
Map<K, V> map = new HashMap<>();
for (String token : tokens) {
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
index 00b9aab0b15..d79d2c32ffa 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
@@ -26,6 +26,7 @@ import java.lang.reflect.Method;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.Map;
import java.util.Properties;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -432,4 +433,16 @@ public class OffloadPoliciesTest {
}
}
+ @Test
+ public void testCreateOffloadPoliciesWithExtraConfiguration() {
+ Properties properties = new Properties();
+ properties.put("managedLedgerOffloadExtraConfigKey1", "value1");
+ properties.put("managedLedgerOffloadExtraConfigKey2", "value2");
+ OffloadPoliciesImpl policies = OffloadPoliciesImpl.create(properties);
+
+ Map<String, String> extraConfigurations =
policies.getManagedLedgerExtraConfigurations();
+ Assert.assertEquals(extraConfigurations.size(), 2);
+ Assert.assertEquals(extraConfigurations.get("Key1"), "value1");
+ Assert.assertEquals(extraConfigurations.get("Key2"), "value2");
+ }
}
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FieldParserTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FieldParserTest.java
index e90b6cbc4a1..b24e9ae4082 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FieldParserTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FieldParserTest.java
@@ -19,12 +19,15 @@
package org.apache.pulsar.common.util;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
+import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import org.testng.annotations.Test;
@@ -94,4 +97,46 @@ public class FieldParserTest {
public Set<String> stringSet;
}
+ @Test
+ public void testNullStrValue() throws Exception {
+ class TestMap {
+ public List<String> list;
+ public Set<String> set;
+ public Map<String, String> map;
+ public Optional<String> optional;
+ }
+
+ Field listField = TestMap.class.getField("list");
+ Object listValue = FieldParser.value(null, listField);
+ assertNull(listValue);
+
+ listValue = FieldParser.value("null", listField);
+ assertTrue(listValue instanceof List);
+ assertEquals(((List) listValue).size(), 1);
+ assertEquals(((List) listValue).get(0), "null");
+
+
+ Field setField = TestMap.class.getField("set");
+ Object setValue = FieldParser.value(null, setField);
+ assertNull(setValue);
+
+ setValue = FieldParser.value("null", setField);
+ assertTrue(setValue instanceof Set);
+ assertEquals(((Set) setValue).size(), 1);
+ assertEquals(((Set) setValue).iterator().next(), "null");
+
+ Field mapField = TestMap.class.getField("map");
+ Object mapValue = FieldParser.value(null, mapField);
+ assertNull(mapValue);
+
+ try {
+ FieldParser.value("null", mapField);
+ } catch (IllegalArgumentException iae) {
+ assertTrue(iae.getMessage().contains("null map-value is not in
correct format key1=value,key2=value2"));
+ }
+
+ Field optionalField = TestMap.class.getField("optional");
+ Object optionalValue = FieldParser.value(null, optionalField);
+ assertEquals(optionalValue, Optional.empty());
+ }
}