This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 52e1d05436c [fix][broker] Duplicate LedgerOffloader creation when
namespace/topic… (#21591)
52e1d05436c is described below
commit 52e1d05436cc5cabfdd93830fc317bb2d945b04e
Author: Baodi Shi <[email protected]>
AuthorDate: Mon Nov 20 17:17:28 2023 +0800
[fix][broker] Duplicate LedgerOffloader creation when namespace/topic…
(#21591)
(cherry picked from commit 98bf9dd72910e1b02dea17148a4199e3b26d7147)
---
.../common/policies/data/OffloadPoliciesImpl.java | 94 ++++++----------------
.../common/policies/data/OffloadPoliciesTest.java | 31 ++++++-
2 files changed, 55 insertions(+), 70 deletions(-)
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
index a6f99a06acd..ab2daa37b4d 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
@@ -30,6 +30,7 @@ import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -85,6 +86,7 @@ public class OffloadPoliciesImpl implements Serializable,
OffloadPolicies {
public static final String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders";
public static final Long DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES = null;
public static final Long DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS = null;
+ public static final String EXTRA_CONFIG_PREFIX =
"managedLedgerOffloadExtraConfig";
public static final String OFFLOAD_THRESHOLD_NAME_IN_CONF_FILE =
"managedLedgerOffloadAutoTriggerSizeThresholdBytes";
@@ -116,8 +118,7 @@ public class OffloadPoliciesImpl implements Serializable,
OffloadPolicies {
private OffloadedReadPriority managedLedgerOffloadedReadPriority =
DEFAULT_OFFLOADED_READ_PRIORITY;
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
- private Map<String, String> managedLedgerExtraConfigurations = null;
-
+ private Map<String, String> managedLedgerExtraConfigurations = new
HashMap<>();
// s3 config, set by service configuration or cli
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
@@ -240,8 +241,7 @@ public class OffloadPoliciesImpl implements Serializable,
OffloadPolicies {
public static OffloadPoliciesImpl create(Properties properties) {
OffloadPoliciesImpl data = new OffloadPoliciesImpl();
- Field[] fields = OffloadPoliciesImpl.class.getDeclaredFields();
- Arrays.stream(fields).forEach(f -> {
+ for (Field f : CONFIGURATION_FIELDS) {
if (properties.containsKey(f.getName())) {
try {
f.setAccessible(true);
@@ -252,14 +252,15 @@ public class OffloadPoliciesImpl implements Serializable,
OffloadPolicies {
f.getName(), properties.get(f.getName())),
e);
}
}
- });
+ }
+
Map<String, String> extraConfigurations =
properties.entrySet().stream()
- .filter(entry ->
entry.getKey().toString().startsWith("managedLedgerOffloadExtraConfig"))
- .collect(Collectors.toMap(
- entry ->
entry.getKey().toString().replaceFirst("managedLedgerOffloadExtraConfig", ""),
- entry -> entry.getValue().toString()));
+ .filter(entry ->
entry.getKey().toString().startsWith(EXTRA_CONFIG_PREFIX))
+ .collect(Collectors.toMap(
+ entry ->
entry.getKey().toString().replaceFirst(EXTRA_CONFIG_PREFIX, ""),
+ entry -> entry.getValue().toString()));
- data.setManagedLedgerExtraConfigurations(extraConfigurations);
+ data.getManagedLedgerExtraConfigurations().putAll(extraConfigurations);
data.compatibleWithBrokerConfigFile(properties);
return data;
@@ -337,64 +338,21 @@ public class OffloadPoliciesImpl implements Serializable,
OffloadPolicies {
public Properties toProperties() {
Properties properties = new Properties();
- setProperty(properties, "managedLedgerOffloadedReadPriority",
this.getManagedLedgerOffloadedReadPriority());
- setProperty(properties, "offloadersDirectory",
this.getOffloadersDirectory());
- setProperty(properties, "managedLedgerOffloadDriver",
this.getManagedLedgerOffloadDriver());
- setProperty(properties, "managedLedgerOffloadMaxThreads",
- this.getManagedLedgerOffloadMaxThreads());
- setProperty(properties, "managedLedgerOffloadPrefetchRounds",
- this.getManagedLedgerOffloadPrefetchRounds());
- setProperty(properties, "managedLedgerOffloadThresholdInBytes",
- this.getManagedLedgerOffloadThresholdInBytes());
- setProperty(properties, "managedLedgerOffloadDeletionLagInMillis",
- this.getManagedLedgerOffloadDeletionLagInMillis());
- setProperty(properties, "managedLedgerOffloadExtraConfigurations",
- this.getManagedLedgerExtraConfigurations());
-
- if (this.isS3Driver()) {
- setProperty(properties, "s3ManagedLedgerOffloadRegion",
- this.getS3ManagedLedgerOffloadRegion());
- setProperty(properties, "s3ManagedLedgerOffloadBucket",
- this.getS3ManagedLedgerOffloadBucket());
- setProperty(properties, "s3ManagedLedgerOffloadServiceEndpoint",
- this.getS3ManagedLedgerOffloadServiceEndpoint());
- setProperty(properties,
"s3ManagedLedgerOffloadMaxBlockSizeInBytes",
- this.getS3ManagedLedgerOffloadMaxBlockSizeInBytes());
- setProperty(properties, "s3ManagedLedgerOffloadCredentialId",
- this.getS3ManagedLedgerOffloadCredentialId());
- setProperty(properties, "s3ManagedLedgerOffloadCredentialSecret",
- this.getS3ManagedLedgerOffloadCredentialSecret());
- setProperty(properties, "s3ManagedLedgerOffloadRole",
- this.getS3ManagedLedgerOffloadRole());
- setProperty(properties, "s3ManagedLedgerOffloadRoleSessionName",
- this.getS3ManagedLedgerOffloadRoleSessionName());
- setProperty(properties,
"s3ManagedLedgerOffloadReadBufferSizeInBytes",
- this.getS3ManagedLedgerOffloadReadBufferSizeInBytes());
- } else if (this.isGcsDriver()) {
- setProperty(properties, "gcsManagedLedgerOffloadRegion",
- this.getGcsManagedLedgerOffloadRegion());
- setProperty(properties, "gcsManagedLedgerOffloadBucket",
- this.getGcsManagedLedgerOffloadBucket());
- setProperty(properties,
"gcsManagedLedgerOffloadMaxBlockSizeInBytes",
- this.getGcsManagedLedgerOffloadMaxBlockSizeInBytes());
- setProperty(properties,
"gcsManagedLedgerOffloadReadBufferSizeInBytes",
- this.getGcsManagedLedgerOffloadReadBufferSizeInBytes());
- setProperty(properties,
"gcsManagedLedgerOffloadServiceAccountKeyFile",
- this.getGcsManagedLedgerOffloadServiceAccountKeyFile());
- } else if (this.isFileSystemDriver()) {
- setProperty(properties, "fileSystemProfilePath",
this.getFileSystemProfilePath());
- setProperty(properties, "fileSystemURI", this.getFileSystemURI());
- }
-
- setProperty(properties, "managedLedgerOffloadBucket",
this.getManagedLedgerOffloadBucket());
- setProperty(properties, "managedLedgerOffloadRegion",
this.getManagedLedgerOffloadRegion());
- setProperty(properties, "managedLedgerOffloadServiceEndpoint",
- this.getManagedLedgerOffloadServiceEndpoint());
- setProperty(properties, "managedLedgerOffloadMaxBlockSizeInBytes",
- this.getManagedLedgerOffloadMaxBlockSizeInBytes());
- setProperty(properties, "managedLedgerOffloadReadBufferSizeInBytes",
- this.getManagedLedgerOffloadReadBufferSizeInBytes());
-
+ for (Field f : CONFIGURATION_FIELDS) {
+ try {
+ f.setAccessible(true);
+ if ("managedLedgerExtraConfigurations".equals(f.getName())) {
+ Map<String, String> extraConfig = (Map<String, String>)
f.get(this);
+ extraConfig.forEach((key, value) -> {
+ setProperty(properties, EXTRA_CONFIG_PREFIX + key,
value);
+ });
+ } else {
+ setProperty(properties, f.getName(), f.get(this));
+ }
+ } catch (Exception e) {
+ throw new IllegalArgumentException("An error occurred while
processing the field: " + f.getName(), e);
+ }
+ }
return properties;
}
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
index 646167ee4c1..ab216dac030 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.common.policies.data;
+import static
org.apache.pulsar.common.policies.data.OffloadPoliciesImpl.EXTRA_CONFIG_PREFIX;
+import static org.testng.Assert.assertEquals;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
@@ -26,6 +28,7 @@ import java.lang.reflect.Method;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.testng.Assert;
@@ -375,8 +378,8 @@ public class OffloadPoliciesTest {
@Test
public void testCreateOffloadPoliciesWithExtraConfiguration() {
Properties properties = new Properties();
- properties.put("managedLedgerOffloadExtraConfigKey1", "value1");
- properties.put("managedLedgerOffloadExtraConfigKey2", "value2");
+ properties.put(EXTRA_CONFIG_PREFIX + "Key1", "value1");
+ properties.put(EXTRA_CONFIG_PREFIX + "Key2", "value2");
OffloadPoliciesImpl policies = OffloadPoliciesImpl.create(properties);
Map<String, String> extraConfigurations =
policies.getManagedLedgerExtraConfigurations();
@@ -384,4 +387,28 @@ public class OffloadPoliciesTest {
Assert.assertEquals(extraConfigurations.get("Key1"), "value1");
Assert.assertEquals(extraConfigurations.get("Key2"), "value2");
}
+
+ /**
+ * Test toProperties as well as create from properties.
+ * @throws Exception
+ */
+ @Test
+ public void testToProperties() throws Exception {
+ // Base information convert.
+ OffloadPoliciesImpl offloadPolicies =
OffloadPoliciesImpl.create("aws-s3", "test-region", "test-bucket",
+ "http://test.endpoint",null, null, null, null, 32 * 1024 *
1024, 5 * 1024 * 1024,
+ 10 * 1024 * 1024L, 10000L,
OffloadedReadPriority.TIERED_STORAGE_FIRST);
+ assertEquals(offloadPolicies,
OffloadPoliciesImpl.create(offloadPolicies.toProperties()));
+
+ // Set useless config to offload policies. Make sure convert
conversion result is the same.
+ offloadPolicies.setFileSystemProfilePath("/test/file");
+ assertEquals(offloadPolicies,
OffloadPoliciesImpl.create(offloadPolicies.toProperties()));
+
+ // Set extra config to offload policies. Make sure convert conversion
result is the same.
+ Map<String, String> extraConfiguration = new HashMap<>();
+ extraConfiguration.put("key1", "value1");
+ extraConfiguration.put("key2", "value2");
+
offloadPolicies.setManagedLedgerExtraConfigurations(extraConfiguration);
+ assertEquals(offloadPolicies,
OffloadPoliciesImpl.create(offloadPolicies.toProperties()));
+ }
}