This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 25d3a86a07b [improve][offload] Extend the offload policies to allow 
specifying more conf (#20804)
25d3a86a07b is described below

commit 25d3a86a07ba98046b3f1204f954261ecccd976a
Author: Yong Zhang <[email protected]>
AuthorDate: Tue Jul 18 11:43:51 2023 +0800

    [improve][offload] Extend the offload policies to allow specifying more 
conf (#20804)
    
    ### Motivation
    
    The offload policies have limited the configurations for the offloaders.  
That means if the offloader needs more configurations, we need to extend more 
fields in the OffloadPoliciesImpl. That doesn't make sense. We should make it 
extendable easily. Add a configuration map support to allow it to set more 
configurations.
---
 .../common/policies/data/OffloadPoliciesImpl.java  | 14 +++++++
 .../org/apache/pulsar/common/util/FieldParser.java |  9 +++++
 .../common/policies/data/OffloadPoliciesTest.java  | 13 +++++++
 .../apache/pulsar/common/util/FieldParserTest.java | 45 ++++++++++++++++++++++
 4 files changed, 81 insertions(+)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
index 843c1bde3b9..f9148ba8699 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
@@ -31,6 +31,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
 import lombok.Data;
@@ -118,6 +119,9 @@ public class OffloadPoliciesImpl implements Serializable, 
OffloadPolicies {
     @Configuration
     @JsonProperty(access = JsonProperty.Access.READ_WRITE)
     private OffloadedReadPriority managedLedgerOffloadedReadPriority = 
DEFAULT_OFFLOADED_READ_PRIORITY;
+    @Configuration
+    @JsonProperty(access = JsonProperty.Access.READ_WRITE)
+    private Map<String, String> managedLedgerExtraConfigurations = null;
 
     // s3 config, set by service configuration or cli
     @Configuration
@@ -257,6 +261,14 @@ public class OffloadPoliciesImpl implements Serializable, 
OffloadPolicies {
                 }
             }
         });
+        Map<String, String> extraConfigurations = 
properties.entrySet().stream()
+            .filter(entry -> 
entry.getKey().toString().startsWith("managedLedgerOffloadExtraConfig"))
+            .collect(Collectors.toMap(
+                entry -> 
entry.getKey().toString().replaceFirst("managedLedgerOffloadExtraConfig", ""),
+                entry -> entry.getValue().toString()));
+
+        data.setManagedLedgerExtraConfigurations(extraConfigurations);
+
         data.compatibleWithBrokerConfigFile(properties);
         return data;
     }
@@ -347,6 +359,8 @@ public class OffloadPoliciesImpl implements Serializable, 
OffloadPolicies {
                 this.getManagedLedgerOffloadThresholdInSeconds());
         setProperty(properties, "managedLedgerOffloadDeletionLagInMillis",
                 this.getManagedLedgerOffloadDeletionLagInMillis());
+        setProperty(properties, "managedLedgerOffloadExtraConfigurations",
+                this.getManagedLedgerExtraConfigurations());
 
         if (this.isS3Driver()) {
             setProperty(properties, "s3ManagedLedgerOffloadRegion",
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java
index 626a14b92ee..8d1ae5294ff 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java
@@ -314,6 +314,9 @@ public final class FieldParser {
      * @return The converted list with type {@code <T>}.
      */
     public static <T> List<T> stringToList(String val, Class<T> type) {
+        if (val == null) {
+            return null;
+        }
         String[] tokens = trim(val).split(",");
         return Arrays.stream(tokens).map(t -> {
             return convert(trim(t), type);
@@ -330,6 +333,9 @@ public final class FieldParser {
      * @return The converted set with type {@code <T>}.
      */
     public static <T> Set<T> stringToSet(String val, Class<T> type) {
+        if (val == null) {
+            return null;
+        }
         String[] tokens = trim(val).split(",");
         return Arrays.stream(tokens).map(t -> {
             return convert(trim(t), type);
@@ -337,6 +343,9 @@ public final class FieldParser {
     }
 
     private static <K, V> Map<K, V> stringToMap(String strValue, Class<K> 
keyType, Class<V> valueType) {
+        if (strValue == null) {
+            return null;
+        }
         String[] tokens = trim(strValue).split(",");
         Map<K, V> map = new HashMap<>();
         for (String token : tokens) {
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
index 00b9aab0b15..d79d2c32ffa 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
@@ -26,6 +26,7 @@ import java.lang.reflect.Method;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Map;
 import java.util.Properties;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -432,4 +433,16 @@ public class OffloadPoliciesTest {
         }
     }
 
+    @Test
+    public void testCreateOffloadPoliciesWithExtraConfiguration() {
+        Properties properties = new Properties();
+        properties.put("managedLedgerOffloadExtraConfigKey1", "value1");
+        properties.put("managedLedgerOffloadExtraConfigKey2", "value2");
+        OffloadPoliciesImpl policies = OffloadPoliciesImpl.create(properties);
+
+        Map<String, String> extraConfigurations = 
policies.getManagedLedgerExtraConfigurations();
+        Assert.assertEquals(extraConfigurations.size(), 2);
+        Assert.assertEquals(extraConfigurations.get("Key1"), "value1");
+        Assert.assertEquals(extraConfigurations.get("Key2"), "value2");
+    }
 }
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FieldParserTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FieldParserTest.java
index e90b6cbc4a1..b24e9ae4082 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FieldParserTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FieldParserTest.java
@@ -19,12 +19,15 @@
 package org.apache.pulsar.common.util;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 
+import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import java.util.Optional;
 import java.util.Set;
 import org.testng.annotations.Test;
 
@@ -94,4 +97,46 @@ public class FieldParserTest {
         public Set<String> stringSet;
     }
 
+    @Test
+    public void testNullStrValue() throws Exception {
+        class TestMap {
+            public List<String> list;
+            public Set<String> set;
+            public Map<String, String> map;
+            public Optional<String> optional;
+        }
+
+        Field listField = TestMap.class.getField("list");
+        Object listValue = FieldParser.value(null, listField);
+        assertNull(listValue);
+
+        listValue = FieldParser.value("null", listField);
+        assertTrue(listValue instanceof List);
+        assertEquals(((List) listValue).size(), 1);
+        assertEquals(((List) listValue).get(0), "null");
+
+
+        Field setField = TestMap.class.getField("set");
+        Object setValue = FieldParser.value(null, setField);
+        assertNull(setValue);
+
+        setValue = FieldParser.value("null", setField);
+        assertTrue(setValue instanceof Set);
+        assertEquals(((Set) setValue).size(), 1);
+        assertEquals(((Set) setValue).iterator().next(), "null");
+
+        Field mapField = TestMap.class.getField("map");
+        Object mapValue = FieldParser.value(null, mapField);
+        assertNull(mapValue);
+
+        try {
+            FieldParser.value("null", mapField);
+        } catch (IllegalArgumentException iae) {
+            assertTrue(iae.getMessage().contains("null map-value is not in 
correct format key1=value,key2=value2"));
+        }
+
+        Field optionalField = TestMap.class.getField("optional");
+        Object optionalValue = FieldParser.value(null, optionalField);
+        assertEquals(optionalValue, Optional.empty());
+    }
 }

Reply via email to