This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1687244ff0e [improve][offload] Extend the offload policies to allow 
specifying more conf (#20804)
1687244ff0e is described below

commit 1687244ff0e9d12dd0d864136f9b39623f0446bc
Author: Yong Zhang <[email protected]>
AuthorDate: Tue Jul 18 11:43:51 2023 +0800

    [improve][offload] Extend the offload policies to allow specifying more 
conf (#20804)
    
    ### Motivation
    
    The offload policies have limited the configurations for the offloaders.  
That means if the offloader needs more configurations, we need to extend more 
fields in the OffloadPoliciesImpl. That doesn't make sense. We should make it 
extendable easily. Add a configuration map support to allow it to set more 
configurations.
---
 .../common/policies/data/OffloadPoliciesImpl.java  | 14 +++++++
 .../org/apache/pulsar/common/util/FieldParser.java |  9 +++++
 .../common/policies/data/OffloadPoliciesTest.java  | 13 +++++++
 .../apache/pulsar/common/util/FieldParserTest.java | 45 ++++++++++++++++++++++
 4 files changed, 81 insertions(+)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
index 843c1bde3b9..f9148ba8699 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
@@ -31,6 +31,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
 import lombok.Data;
@@ -118,6 +119,9 @@ public class OffloadPoliciesImpl implements Serializable, 
OffloadPolicies {
     @Configuration
     @JsonProperty(access = JsonProperty.Access.READ_WRITE)
     private OffloadedReadPriority managedLedgerOffloadedReadPriority = 
DEFAULT_OFFLOADED_READ_PRIORITY;
+    @Configuration
+    @JsonProperty(access = JsonProperty.Access.READ_WRITE)
+    private Map<String, String> managedLedgerExtraConfigurations = null;
 
     // s3 config, set by service configuration or cli
     @Configuration
@@ -257,6 +261,14 @@ public class OffloadPoliciesImpl implements Serializable, 
OffloadPolicies {
                 }
             }
         });
+        Map<String, String> extraConfigurations = 
properties.entrySet().stream()
+            .filter(entry -> 
entry.getKey().toString().startsWith("managedLedgerOffloadExtraConfig"))
+            .collect(Collectors.toMap(
+                entry -> 
entry.getKey().toString().replaceFirst("managedLedgerOffloadExtraConfig", ""),
+                entry -> entry.getValue().toString()));
+
+        data.setManagedLedgerExtraConfigurations(extraConfigurations);
+
         data.compatibleWithBrokerConfigFile(properties);
         return data;
     }
@@ -347,6 +359,8 @@ public class OffloadPoliciesImpl implements Serializable, 
OffloadPolicies {
                 this.getManagedLedgerOffloadThresholdInSeconds());
         setProperty(properties, "managedLedgerOffloadDeletionLagInMillis",
                 this.getManagedLedgerOffloadDeletionLagInMillis());
+        setProperty(properties, "managedLedgerOffloadExtraConfigurations",
+                this.getManagedLedgerExtraConfigurations());
 
         if (this.isS3Driver()) {
             setProperty(properties, "s3ManagedLedgerOffloadRegion",
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java
index 626a14b92ee..8d1ae5294ff 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java
@@ -314,6 +314,9 @@ public final class FieldParser {
      * @return The converted list with type {@code <T>}.
      */
     public static <T> List<T> stringToList(String val, Class<T> type) {
+        if (val == null) {
+            return null;
+        }
         String[] tokens = trim(val).split(",");
         return Arrays.stream(tokens).map(t -> {
             return convert(trim(t), type);
@@ -330,6 +333,9 @@ public final class FieldParser {
      * @return The converted set with type {@code <T>}.
      */
     public static <T> Set<T> stringToSet(String val, Class<T> type) {
+        if (val == null) {
+            return null;
+        }
         String[] tokens = trim(val).split(",");
         return Arrays.stream(tokens).map(t -> {
             return convert(trim(t), type);
@@ -337,6 +343,9 @@ public final class FieldParser {
     }
 
     private static <K, V> Map<K, V> stringToMap(String strValue, Class<K> 
keyType, Class<V> valueType) {
+        if (strValue == null) {
+            return null;
+        }
         String[] tokens = trim(strValue).split(",");
         Map<K, V> map = new HashMap<>();
         for (String token : tokens) {
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
index 00b9aab0b15..d79d2c32ffa 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
@@ -26,6 +26,7 @@ import java.lang.reflect.Method;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Map;
 import java.util.Properties;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -432,4 +433,16 @@ public class OffloadPoliciesTest {
         }
     }
 
+    @Test
+    public void testCreateOffloadPoliciesWithExtraConfiguration() {
+        Properties properties = new Properties();
+        properties.put("managedLedgerOffloadExtraConfigKey1", "value1");
+        properties.put("managedLedgerOffloadExtraConfigKey2", "value2");
+        OffloadPoliciesImpl policies = OffloadPoliciesImpl.create(properties);
+
+        Map<String, String> extraConfigurations = 
policies.getManagedLedgerExtraConfigurations();
+        Assert.assertEquals(extraConfigurations.size(), 2);
+        Assert.assertEquals(extraConfigurations.get("Key1"), "value1");
+        Assert.assertEquals(extraConfigurations.get("Key2"), "value2");
+    }
 }
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FieldParserTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FieldParserTest.java
index e90b6cbc4a1..b24e9ae4082 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FieldParserTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FieldParserTest.java
@@ -19,12 +19,15 @@
 package org.apache.pulsar.common.util;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 
+import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import java.util.Optional;
 import java.util.Set;
 import org.testng.annotations.Test;
 
@@ -94,4 +97,46 @@ public class FieldParserTest {
         public Set<String> stringSet;
     }
 
+    @Test
+    public void testNullStrValue() throws Exception {
+        class TestMap {
+            public List<String> list;
+            public Set<String> set;
+            public Map<String, String> map;
+            public Optional<String> optional;
+        }
+
+        Field listField = TestMap.class.getField("list");
+        Object listValue = FieldParser.value(null, listField);
+        assertNull(listValue);
+
+        listValue = FieldParser.value("null", listField);
+        assertTrue(listValue instanceof List);
+        assertEquals(((List) listValue).size(), 1);
+        assertEquals(((List) listValue).get(0), "null");
+
+
+        Field setField = TestMap.class.getField("set");
+        Object setValue = FieldParser.value(null, setField);
+        assertNull(setValue);
+
+        setValue = FieldParser.value("null", setField);
+        assertTrue(setValue instanceof Set);
+        assertEquals(((Set) setValue).size(), 1);
+        assertEquals(((Set) setValue).iterator().next(), "null");
+
+        Field mapField = TestMap.class.getField("map");
+        Object mapValue = FieldParser.value(null, mapField);
+        assertNull(mapValue);
+
+        try {
+            FieldParser.value("null", mapField);
+        } catch (IllegalArgumentException iae) {
+            assertTrue(iae.getMessage().contains("null map-value is not in 
correct format key1=value,key2=value2"));
+        }
+
+        Field optionalField = TestMap.class.getField("optional");
+        Object optionalValue = FieldParser.value(null, optionalField);
+        assertEquals(optionalValue, Optional.empty());
+    }
 }

Reply via email to