This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 5f0a1604f96 [fix][broker] Duplicate LedgerOffloader creation when 
namespace/topic… (#21591)
5f0a1604f96 is described below

commit 5f0a1604f96cda8f404baaedd94491f8cd3335f5
Author: Baodi Shi <[email protected]>
AuthorDate: Mon Nov 20 17:17:28 2023 +0800

    [fix][broker] Duplicate LedgerOffloader creation when namespace/topic… 
(#21591)
    
    (cherry picked from commit 98bf9dd72910e1b02dea17148a4199e3b26d7147)
---
 .../common/policies/data/OffloadPoliciesImpl.java  | 96 ++++++----------------
 .../common/policies/data/OffloadPoliciesTest.java  | 31 ++++++-
 2 files changed, 55 insertions(+), 72 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
index f9148ba8699..51e181811c2 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
@@ -30,6 +30,7 @@ import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -86,6 +87,7 @@ public class OffloadPoliciesImpl implements Serializable, 
OffloadPolicies {
     public static final Long DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES = null;
     public static final Long DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS = null;
     public static final Long DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS = null;
+    public static final String EXTRA_CONFIG_PREFIX = 
"managedLedgerOffloadExtraConfig";
 
     public static final String OFFLOAD_THRESHOLD_NAME_IN_CONF_FILE =
             "managedLedgerOffloadAutoTriggerSizeThresholdBytes";
@@ -121,8 +123,7 @@ public class OffloadPoliciesImpl implements Serializable, 
OffloadPolicies {
     private OffloadedReadPriority managedLedgerOffloadedReadPriority = 
DEFAULT_OFFLOADED_READ_PRIORITY;
     @Configuration
     @JsonProperty(access = JsonProperty.Access.READ_WRITE)
-    private Map<String, String> managedLedgerExtraConfigurations = null;
-
+    private Map<String, String> managedLedgerExtraConfigurations = new 
HashMap<>();
     // s3 config, set by service configuration or cli
     @Configuration
     @JsonProperty(access = JsonProperty.Access.READ_WRITE)
@@ -248,8 +249,7 @@ public class OffloadPoliciesImpl implements Serializable, 
OffloadPolicies {
 
     public static OffloadPoliciesImpl create(Properties properties) {
         OffloadPoliciesImpl data = new OffloadPoliciesImpl();
-        Field[] fields = OffloadPoliciesImpl.class.getDeclaredFields();
-        Arrays.stream(fields).forEach(f -> {
+        for (Field f : CONFIGURATION_FIELDS) {
             if (properties.containsKey(f.getName())) {
                 try {
                     f.setAccessible(true);
@@ -260,14 +260,15 @@ public class OffloadPoliciesImpl implements Serializable, 
OffloadPolicies {
                                     f.getName(), properties.get(f.getName())), 
e);
                 }
             }
-        });
+        }
+
         Map<String, String> extraConfigurations = 
properties.entrySet().stream()
-            .filter(entry -> 
entry.getKey().toString().startsWith("managedLedgerOffloadExtraConfig"))
-            .collect(Collectors.toMap(
-                entry -> 
entry.getKey().toString().replaceFirst("managedLedgerOffloadExtraConfig", ""),
-                entry -> entry.getValue().toString()));
+                .filter(entry -> 
entry.getKey().toString().startsWith(EXTRA_CONFIG_PREFIX))
+                .collect(Collectors.toMap(
+                        entry -> 
entry.getKey().toString().replaceFirst(EXTRA_CONFIG_PREFIX, ""),
+                        entry -> entry.getValue().toString()));
 
-        data.setManagedLedgerExtraConfigurations(extraConfigurations);
+        data.getManagedLedgerExtraConfigurations().putAll(extraConfigurations);
 
         data.compatibleWithBrokerConfigFile(properties);
         return data;
@@ -346,66 +347,21 @@ public class OffloadPoliciesImpl implements Serializable, 
OffloadPolicies {
 
     public Properties toProperties() {
         Properties properties = new Properties();
-        setProperty(properties, "managedLedgerOffloadedReadPriority", 
this.getManagedLedgerOffloadedReadPriority());
-        setProperty(properties, "offloadersDirectory", 
this.getOffloadersDirectory());
-        setProperty(properties, "managedLedgerOffloadDriver", 
this.getManagedLedgerOffloadDriver());
-        setProperty(properties, "managedLedgerOffloadMaxThreads",
-                this.getManagedLedgerOffloadMaxThreads());
-        setProperty(properties, "managedLedgerOffloadPrefetchRounds",
-                this.getManagedLedgerOffloadPrefetchRounds());
-        setProperty(properties, "managedLedgerOffloadThresholdInBytes",
-                this.getManagedLedgerOffloadThresholdInBytes());
-        setProperty(properties, "managedLedgerOffloadThresholdInSeconds",
-                this.getManagedLedgerOffloadThresholdInSeconds());
-        setProperty(properties, "managedLedgerOffloadDeletionLagInMillis",
-                this.getManagedLedgerOffloadDeletionLagInMillis());
-        setProperty(properties, "managedLedgerOffloadExtraConfigurations",
-                this.getManagedLedgerExtraConfigurations());
-
-        if (this.isS3Driver()) {
-            setProperty(properties, "s3ManagedLedgerOffloadRegion",
-                    this.getS3ManagedLedgerOffloadRegion());
-            setProperty(properties, "s3ManagedLedgerOffloadBucket",
-                    this.getS3ManagedLedgerOffloadBucket());
-            setProperty(properties, "s3ManagedLedgerOffloadServiceEndpoint",
-                    this.getS3ManagedLedgerOffloadServiceEndpoint());
-            setProperty(properties, 
"s3ManagedLedgerOffloadMaxBlockSizeInBytes",
-                    this.getS3ManagedLedgerOffloadMaxBlockSizeInBytes());
-            setProperty(properties, "s3ManagedLedgerOffloadCredentialId",
-                    this.getS3ManagedLedgerOffloadCredentialId());
-            setProperty(properties, "s3ManagedLedgerOffloadCredentialSecret",
-                    this.getS3ManagedLedgerOffloadCredentialSecret());
-            setProperty(properties, "s3ManagedLedgerOffloadRole",
-                    this.getS3ManagedLedgerOffloadRole());
-            setProperty(properties, "s3ManagedLedgerOffloadRoleSessionName",
-                    this.getS3ManagedLedgerOffloadRoleSessionName());
-            setProperty(properties, 
"s3ManagedLedgerOffloadReadBufferSizeInBytes",
-                    this.getS3ManagedLedgerOffloadReadBufferSizeInBytes());
-        } else if (this.isGcsDriver()) {
-            setProperty(properties, "gcsManagedLedgerOffloadRegion",
-                    this.getGcsManagedLedgerOffloadRegion());
-            setProperty(properties, "gcsManagedLedgerOffloadBucket",
-                    this.getGcsManagedLedgerOffloadBucket());
-            setProperty(properties, 
"gcsManagedLedgerOffloadMaxBlockSizeInBytes",
-                    this.getGcsManagedLedgerOffloadMaxBlockSizeInBytes());
-            setProperty(properties, 
"gcsManagedLedgerOffloadReadBufferSizeInBytes",
-                    this.getGcsManagedLedgerOffloadReadBufferSizeInBytes());
-            setProperty(properties, 
"gcsManagedLedgerOffloadServiceAccountKeyFile",
-                    this.getGcsManagedLedgerOffloadServiceAccountKeyFile());
-        } else if (this.isFileSystemDriver()) {
-            setProperty(properties, "fileSystemProfilePath", 
this.getFileSystemProfilePath());
-            setProperty(properties, "fileSystemURI", this.getFileSystemURI());
-        }
-
-        setProperty(properties, "managedLedgerOffloadBucket", 
this.getManagedLedgerOffloadBucket());
-        setProperty(properties, "managedLedgerOffloadRegion", 
this.getManagedLedgerOffloadRegion());
-        setProperty(properties, "managedLedgerOffloadServiceEndpoint",
-                this.getManagedLedgerOffloadServiceEndpoint());
-        setProperty(properties, "managedLedgerOffloadMaxBlockSizeInBytes",
-                this.getManagedLedgerOffloadMaxBlockSizeInBytes());
-        setProperty(properties, "managedLedgerOffloadReadBufferSizeInBytes",
-                this.getManagedLedgerOffloadReadBufferSizeInBytes());
-
+        for (Field f : CONFIGURATION_FIELDS) {
+            try {
+                f.setAccessible(true);
+                if ("managedLedgerExtraConfigurations".equals(f.getName())) {
+                    Map<String, String> extraConfig = (Map<String, String>) 
f.get(this);
+                    extraConfig.forEach((key, value) -> {
+                        setProperty(properties, EXTRA_CONFIG_PREFIX + key, 
value);
+                    });
+                } else {
+                    setProperty(properties, f.getName(), f.get(this));
+                }
+            } catch (Exception e) {
+                throw new IllegalArgumentException("An error occurred while 
processing the field: " + f.getName(), e);
+            }
+        }
         return properties;
     }
 
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
index d79d2c32ffa..bbede4e9820 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.common.policies.data;
 
+import static 
org.apache.pulsar.common.policies.data.OffloadPoliciesImpl.EXTRA_CONFIG_PREFIX;
+import static org.testng.Assert.assertEquals;
 import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
@@ -26,6 +28,7 @@ import java.lang.reflect.Method;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 import org.testng.Assert;
@@ -436,8 +439,8 @@ public class OffloadPoliciesTest {
     @Test
     public void testCreateOffloadPoliciesWithExtraConfiguration() {
         Properties properties = new Properties();
-        properties.put("managedLedgerOffloadExtraConfigKey1", "value1");
-        properties.put("managedLedgerOffloadExtraConfigKey2", "value2");
+        properties.put(EXTRA_CONFIG_PREFIX + "Key1", "value1");
+        properties.put(EXTRA_CONFIG_PREFIX + "Key2", "value2");
         OffloadPoliciesImpl policies = OffloadPoliciesImpl.create(properties);
 
         Map<String, String> extraConfigurations = 
policies.getManagedLedgerExtraConfigurations();
@@ -445,4 +448,28 @@ public class OffloadPoliciesTest {
         Assert.assertEquals(extraConfigurations.get("Key1"), "value1");
         Assert.assertEquals(extraConfigurations.get("Key2"), "value2");
     }
+
+    /**
+     * Test toProperties as well as create from properties.
+     * @throws Exception
+     */
+    @Test
+    public void testToProperties() throws Exception {
+        // Base information convert.
+        OffloadPoliciesImpl offloadPolicies = 
OffloadPoliciesImpl.create("aws-s3", "test-region", "test-bucket",
+                "http://test.endpoint";, null, null, null, null, 32 * 1024 * 
1024, 5 * 1024 * 1024,
+                10 * 1024 * 1024L, 100L, 10000L, 
OffloadedReadPriority.TIERED_STORAGE_FIRST);
+        assertEquals(offloadPolicies, 
OffloadPoliciesImpl.create(offloadPolicies.toProperties()));
+
+        // Set useless config to offload policies. Make sure convert 
conversion result is the same.
+        offloadPolicies.setFileSystemProfilePath("/test/file");
+        assertEquals(offloadPolicies, 
OffloadPoliciesImpl.create(offloadPolicies.toProperties()));
+
+        // Set extra config to offload policies. Make sure convert conversion 
result is the same.
+        Map<String, String> extraConfiguration = new HashMap<>();
+        extraConfiguration.put("key1", "value1");
+        extraConfiguration.put("key2", "value2");
+        
offloadPolicies.setManagedLedgerExtraConfigurations(extraConfiguration); 
+        assertEquals(offloadPolicies, 
OffloadPoliciesImpl.create(offloadPolicies.toProperties()));
+    }
 }

Reply via email to