This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c1cc2d6f413ef33a9d866b3100b2a85b9a0f901a
Author: Yong Zhang <[email protected]>
AuthorDate: Mon Nov 18 10:09:26 2024 +0800

    [improve][offload] Use filesystemURI as the storage path (#23591)
    
    (cherry picked from commit b915f6e73e5b8cfa94c4c55eb930befc682674d4)
---
 .../impl/FileSystemManagedLedgerOffloader.java     |   2 +-
 .../impl/FileSystemOffloaderLocalFileTest.java     | 119 +++++++++++++++++++++
 .../resources/filesystem_offload_core_site.xml     |  48 +++++++++
 3 files changed, 168 insertions(+), 1 deletion(-)

diff --git 
a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
 
b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
index 25b63374946..18d7eccff35 100644
--- 
a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
+++ 
b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
@@ -105,7 +105,7 @@ public class FileSystemManagedLedgerOffloader implements 
LedgerOffloader {
 
         
this.configuration.setClassLoader(FileSystemLedgerOffloaderFactory.class.getClassLoader());
         this.driverName = conf.getManagedLedgerOffloadDriver();
-        this.storageBasePath = configuration.get("hadoop.tmp.dir");
+        this.storageBasePath = configuration.get("fs.defaultFS");
         this.scheduler = scheduler;
         this.fileSystem = FileSystem.get(configuration);
         this.assignmentScheduler = OrderedScheduler.newSchedulerBuilder()
diff --git 
a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemOffloaderLocalFileTest.java
 
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemOffloaderLocalFileTest.java
new file mode 100644
index 00000000000..14734b3faca
--- /dev/null
+++ 
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemOffloaderLocalFileTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.filesystem.impl;
+
+import static org.testng.Assert.assertEquals;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.testng.annotations.Test;
+
+public class FileSystemOffloaderLocalFileTest {
+    private OrderedScheduler scheduler = 
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("offloader").build();
+    private LedgerOffloaderStats offloaderStats = 
LedgerOffloaderStats.create(true, true, scheduler, 60);
+
+
+    private String getResourceFilePath(String name) {
+        return getClass().getClassLoader().getResource(name).getPath();
+    }
+
+    @Test
+    public void testReadWriteWithLocalFileUsingFileSystemURI() throws 
Exception {
+        // prepare the offload policies
+        final String basePath = "/tmp";
+        OffloadPoliciesImpl offloadPolicies = new OffloadPoliciesImpl();
+        offloadPolicies.setFileSystemURI("file://" + basePath);
+        offloadPolicies.setManagedLedgerOffloadDriver("filesystem");
+        
offloadPolicies.setFileSystemProfilePath(getResourceFilePath("filesystem_offload_core_site.xml"));
+
+        // initialize the offloader with the offload policies
+        var offloader = 
FileSystemManagedLedgerOffloader.create(offloadPolicies, scheduler, 
offloaderStats);
+
+        int numberOfEntries = 100;
+
+        // prepare the data in bookkeeper
+        BookKeeper bk = new PulsarMockBookKeeper(scheduler);
+        LedgerHandle lh = bk.createLedger(1,1,1, BookKeeper.DigestType.CRC32, 
"".getBytes());
+        for (int i = 0; i <  numberOfEntries; i++) {
+            byte[] entry = ("foobar"+i).getBytes();
+            lh.addEntry(entry);
+        }
+        lh.close();
+
+        ReadHandle read = bk.newOpenLedgerOp()
+            .withLedgerId(lh.getId())
+            .withDigestType(DigestType.CRC32)
+            .withPassword("".getBytes()).execute().get();
+
+        final String mlName = 
TopicName.get("testWriteLocalFIle").getPersistenceNamingEncoding();
+        Map<String, String> offloadDriverMetadata = new HashMap<>();
+        offloadDriverMetadata.put("ManagedLedgerName", mlName);
+
+        UUID uuid = UUID.randomUUID();
+        offloader.offload(read, uuid, offloadDriverMetadata).get();
+        ReadHandle toTest = offloader.readOffloaded(read.getId(), uuid, 
offloadDriverMetadata).get();
+        assertEquals(toTest.getLastAddConfirmed(), read.getLastAddConfirmed());
+        LedgerEntries toTestEntries = toTest.read(0, numberOfEntries - 1);
+        LedgerEntries toWriteEntries = read.read(0,numberOfEntries - 1);
+        Iterator<LedgerEntry> toTestIter = toTestEntries.iterator();
+        Iterator<LedgerEntry> toWriteIter = toWriteEntries.iterator();
+        while(toTestIter.hasNext()) {
+            LedgerEntry toWriteEntry = toWriteIter.next();
+            LedgerEntry toTestEntry = toTestIter.next();
+
+            assertEquals(toWriteEntry.getLedgerId(), 
toTestEntry.getLedgerId());
+            assertEquals(toWriteEntry.getEntryId(), toTestEntry.getEntryId());
+            assertEquals(toWriteEntry.getLength(), toTestEntry.getLength());
+            assertEquals(toWriteEntry.getEntryBuffer(), 
toTestEntry.getEntryBuffer());
+        }
+        toTestEntries = toTest.read(1, numberOfEntries - 1);
+        toWriteEntries = read.read(1,numberOfEntries - 1);
+        toTestIter = toTestEntries.iterator();
+        toWriteIter = toWriteEntries.iterator();
+        while(toTestIter.hasNext()) {
+            LedgerEntry toWriteEntry = toWriteIter.next();
+            LedgerEntry toTestEntry = toTestIter.next();
+
+            assertEquals(toWriteEntry.getLedgerId(), 
toTestEntry.getLedgerId());
+            assertEquals(toWriteEntry.getEntryId(), toTestEntry.getEntryId());
+            assertEquals(toWriteEntry.getLength(), toTestEntry.getLength());
+            assertEquals(toWriteEntry.getEntryBuffer(), 
toTestEntry.getEntryBuffer());
+        }
+
+        // check the file located in the local file system
+        Path offloadedFilePath = Paths.get(basePath, mlName);
+        assertEquals(Files.exists(offloadedFilePath), true);
+    }
+}
diff --git 
a/tiered-storage/file-system/src/test/resources/filesystem_offload_core_site.xml
 
b/tiered-storage/file-system/src/test/resources/filesystem_offload_core_site.xml
new file mode 100644
index 00000000000..d26cec2cc60
--- /dev/null
+++ 
b/tiered-storage/file-system/src/test/resources/filesystem_offload_core_site.xml
@@ -0,0 +1,48 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+    <!--file system uri, necessary-->
+    <property>
+        <name>fs.defaultFS</name>
+        <value></value>
+    </property>
+    <property>
+        <name>hadoop.tmp.dir</name>
+        <value>pulsar</value>
+    </property>
+    <property>
+        <name>io.file.buffer.size</name>
+        <value>4096</value>
+    </property>
+    <property>
+        <name>io.seqfile.compress.blocksize</name>
+        <value>1000000</value>
+    </property>
+    <property>
+        <name>io.seqfile.compression.type</name>
+        <value>BLOCK</value>
+    </property>
+    <property>
+        <name>io.map.index.interval</name>
+        <value>128</value>
+    </property>
+
+</configuration>

Reply via email to