This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 75779562d60 [improve][offload] Support to configure more offload 
driver (#20736)
75779562d60 is described below

commit 75779562d604993666c591c24f12b4133f860b5d
Author: Yong Zhang <[email protected]>
AuthorDate: Tue Jul 11 20:42:24 2023 +0800

    [improve][offload] Support to configure more offload driver (#20736)
---
 .../common/policies/data/OffloadPoliciesImpl.java  | 18 +++++-
 .../common/policies/data/OffloadPoliciesTest.java  | 67 ++++++++++++++++++++++
 2 files changed, 83 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
index fb33e3198aa..843c1bde3b9 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
@@ -32,6 +32,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
+import java.util.stream.Collectors;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -61,12 +62,25 @@ public class OffloadPoliciesImpl implements Serializable, 
OffloadPolicies {
         CONFIGURATION_FIELDS = Collections.unmodifiableList(temp);
     }
 
+    public static final ImmutableList<String> INTERNAL_SUPPORTED_DRIVER = 
ImmutableList.of("S3",
+        "aws-s3", "google-cloud-storage", "filesystem", "azureblob", 
"aliyun-oss");
+    public static final ImmutableList<String> DRIVER_NAMES;
+    static {
+        String extraDrivers = 
System.getProperty("pulsar.extra.offload.drivers", "");
+        if (extraDrivers.trim().isEmpty()) {
+            DRIVER_NAMES = INTERNAL_SUPPORTED_DRIVER;
+        } else {
+            DRIVER_NAMES = ImmutableList.<String>builder()
+                .addAll(INTERNAL_SUPPORTED_DRIVER)
+                .addAll(Arrays.stream(StringUtils.split(extraDrivers, ','))
+                    .map(String::trim).collect(Collectors.toSet())).build();
+        }
+    }
+
     public static final int DEFAULT_MAX_BLOCK_SIZE_IN_BYTES = 64 * 1024 * 
1024;   // 64MB
     public static final int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024;   
   // 1MB
     public static final int DEFAULT_OFFLOAD_MAX_THREADS = 2;
     public static final int DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS = 1;
-    public static final ImmutableList<String> DRIVER_NAMES = ImmutableList
-            .of("S3", "aws-s3", "google-cloud-storage", "filesystem", 
"azureblob", "aliyun-oss");
     public static final String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders";
     public static final Long DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES = null;
     public static final Long DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS = null;
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
index 88036b16884..00b9aab0b15 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
@@ -18,6 +18,14 @@
  */
 package org.apache.pulsar.common.policies.data;
 
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Method;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.Properties;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -365,4 +373,63 @@ public class OffloadPoliciesTest {
         
Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(),
 brokerDeletionLag);
         
Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().toString(),
 brokerReadPriority);
     }
+
+    @Test
+    public void testSupportExtraOffloadDrivers() throws Exception {
+        System.setProperty("pulsar.extra.offload.drivers", "driverA, driverB");
+        // using the custom classloader to reload the offload policies class 
to read the
+        // system property correctly.
+        TestClassLoader loader = new TestClassLoader();
+        Class<?> clazz = 
loader.loadClass("org.apache.pulsar.common.policies.data.OffloadPoliciesImpl");
+        Object o = clazz.getDeclaredConstructor().newInstance();
+        clazz.getDeclaredMethod("setManagedLedgerOffloadDriver", 
String.class).invoke(o, "driverA");
+        Method method = clazz.getDeclaredMethod("driverSupported");
+        Assert.assertEquals(method.invoke(o), true);
+        clazz.getDeclaredMethod("setManagedLedgerOffloadDriver", 
String.class).invoke(o, "driverB");
+        Assert.assertEquals(method.invoke(o), true);
+        clazz.getDeclaredMethod("setManagedLedgerOffloadDriver", 
String.class).invoke(o, "driverC");
+        Assert.assertEquals(method.invoke(o), false);
+        clazz.getDeclaredMethod("setManagedLedgerOffloadDriver", 
String.class).invoke(o, "aws-s3");
+        Assert.assertEquals(method.invoke(o), true);
+    }
+
+    // this is used for the testSupportExtraOffloadDrivers. Because we need to 
change the system property,
+    // we need to reload the class to read the system property.
+    static class TestClassLoader extends ClassLoader {
+        @Override
+        public Class<?> loadClass(String name) throws ClassNotFoundException {
+            if (name.contains("OffloadPoliciesImpl")) {
+                return getClass(name);
+            }
+            return super.loadClass(name);
+        }
+
+        private Class<?> getClass(String name) {
+            String file = name.replace('.', File.separatorChar) + ".class";
+            Path targetPath = 
Paths.get(getClass().getClassLoader().getResource(".").getPath()).getParent();
+            file = Paths.get(targetPath.toString(), "classes", 
file).toString();
+            byte[] byteArr = null;
+            try {
+                byteArr = loadClassData(file);
+                Class<?> c = defineClass(name, byteArr, 0, byteArr.length);
+                resolveClass(c);
+                return c;
+            } catch (Exception e) {
+                e.printStackTrace();
+                return null;
+            }
+        }
+
+        private byte[] loadClassData(String name) throws IOException {
+            InputStream stream = Files.newInputStream(Paths.get(name));
+            int size = stream.available();
+            byte buff[] = new byte[size];
+            DataInputStream in = new DataInputStream(stream);
+            // Reading the binary data
+            in.readFully(buff);
+            in.close();
+            return buff;
+        }
+    }
+
 }

Reply via email to