This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 75779562d60 [improve][offload] Support to configure more offload
driver (#20736)
75779562d60 is described below
commit 75779562d604993666c591c24f12b4133f860b5d
Author: Yong Zhang <[email protected]>
AuthorDate: Tue Jul 11 20:42:24 2023 +0800
[improve][offload] Support to configure more offload driver (#20736)
---
.../common/policies/data/OffloadPoliciesImpl.java | 18 +++++-
.../common/policies/data/OffloadPoliciesTest.java | 67 ++++++++++++++++++++++
2 files changed, 83 insertions(+), 2 deletions(-)
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
index fb33e3198aa..843c1bde3b9 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
@@ -32,6 +32,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
+import java.util.stream.Collectors;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -61,12 +62,25 @@ public class OffloadPoliciesImpl implements Serializable,
OffloadPolicies {
CONFIGURATION_FIELDS = Collections.unmodifiableList(temp);
}
+ public static final ImmutableList<String> INTERNAL_SUPPORTED_DRIVER =
ImmutableList.of("S3",
+ "aws-s3", "google-cloud-storage", "filesystem", "azureblob",
"aliyun-oss");
+ public static final ImmutableList<String> DRIVER_NAMES;
+ static {
+ String extraDrivers =
System.getProperty("pulsar.extra.offload.drivers", "");
+ if (extraDrivers.trim().isEmpty()) {
+ DRIVER_NAMES = INTERNAL_SUPPORTED_DRIVER;
+ } else {
+ DRIVER_NAMES = ImmutableList.<String>builder()
+ .addAll(INTERNAL_SUPPORTED_DRIVER)
+ .addAll(Arrays.stream(StringUtils.split(extraDrivers, ','))
+ .map(String::trim).collect(Collectors.toSet())).build();
+ }
+ }
+
public static final int DEFAULT_MAX_BLOCK_SIZE_IN_BYTES = 64 * 1024 *
1024; // 64MB
public static final int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024;
// 1MB
public static final int DEFAULT_OFFLOAD_MAX_THREADS = 2;
public static final int DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS = 1;
- public static final ImmutableList<String> DRIVER_NAMES = ImmutableList
- .of("S3", "aws-s3", "google-cloud-storage", "filesystem",
"azureblob", "aliyun-oss");
public static final String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders";
public static final Long DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES = null;
public static final Long DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS = null;
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
index 88036b16884..00b9aab0b15 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
@@ -18,6 +18,14 @@
*/
package org.apache.pulsar.common.policies.data;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Method;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.Properties;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -365,4 +373,63 @@ public class OffloadPoliciesTest {
Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(),
brokerDeletionLag);
Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().toString(),
brokerReadPriority);
}
+
+ @Test
+ public void testSupportExtraOffloadDrivers() throws Exception {
+ System.setProperty("pulsar.extra.offload.drivers", "driverA, driverB");
+ // using the custom classloader to reload the offload policies class
to read the
+ // system property correctly.
+ TestClassLoader loader = new TestClassLoader();
+ Class<?> clazz =
loader.loadClass("org.apache.pulsar.common.policies.data.OffloadPoliciesImpl");
+ Object o = clazz.getDeclaredConstructor().newInstance();
+ clazz.getDeclaredMethod("setManagedLedgerOffloadDriver",
String.class).invoke(o, "driverA");
+ Method method = clazz.getDeclaredMethod("driverSupported");
+ Assert.assertEquals(method.invoke(o), true);
+ clazz.getDeclaredMethod("setManagedLedgerOffloadDriver",
String.class).invoke(o, "driverB");
+ Assert.assertEquals(method.invoke(o), true);
+ clazz.getDeclaredMethod("setManagedLedgerOffloadDriver",
String.class).invoke(o, "driverC");
+ Assert.assertEquals(method.invoke(o), false);
+ clazz.getDeclaredMethod("setManagedLedgerOffloadDriver",
String.class).invoke(o, "aws-s3");
+ Assert.assertEquals(method.invoke(o), true);
+ }
+
+ // this is used for the testSupportExtraOffloadDrivers. Because we need to
change the system property,
+ // we need to reload the class to read the system property.
+ static class TestClassLoader extends ClassLoader {
+ @Override
+ public Class<?> loadClass(String name) throws ClassNotFoundException {
+ if (name.contains("OffloadPoliciesImpl")) {
+ return getClass(name);
+ }
+ return super.loadClass(name);
+ }
+
+ private Class<?> getClass(String name) {
+ String file = name.replace('.', File.separatorChar) + ".class";
+ Path targetPath =
Paths.get(getClass().getClassLoader().getResource(".").getPath()).getParent();
+ file = Paths.get(targetPath.toString(), "classes",
file).toString();
+ byte[] byteArr = null;
+ try {
+ byteArr = loadClassData(file);
+ Class<?> c = defineClass(name, byteArr, 0, byteArr.length);
+ resolveClass(c);
+ return c;
+ } catch (Exception e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+
+ private byte[] loadClassData(String name) throws IOException {
+ InputStream stream = Files.newInputStream(Paths.get(name));
+ int size = stream.available();
+ byte buff[] = new byte[size];
+ DataInputStream in = new DataInputStream(stream);
+ // Reading the binary data
+ in.readFully(buff);
+ in.close();
+ return buff;
+ }
+ }
+
}