This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 31907c4ac59 [fix][offload] fix offload metrics error (#20366)
31907c4ac59 is described below

commit 31907c4ac59b3eec1a0008055c4c0099eed7403c
Author: YingQun Zhong <[email protected]>
AuthorDate: Wed May 31 15:58:51 2023 +0800

    [fix][offload] fix offload metrics error (#20366)
---
 .../org/apache/pulsar/common/naming/TopicName.java | 36 ++++++++++++++++++++++
 .../apache/pulsar/common/naming/TopicNameTest.java | 34 ++++++++++++++++++++
 .../impl/FileStoreBackedReadHandleImpl.java        | 11 ++++---
 .../impl/FileSystemManagedLedgerOffloader.java     | 22 ++++++++-----
 .../impl/FileSystemManagedLedgerOffloaderTest.java | 24 ++++++++-------
 .../impl/BlobStoreBackedInputStreamImpl.java       |  9 ++++--
 .../jcloud/impl/BlobStoreBackedReadHandleImpl.java |  4 ++-
 .../impl/BlobStoreBackedReadHandleImplV2.java      |  4 ++-
 .../impl/BlobStoreManagedLedgerOffloader.java      | 12 +++++---
 .../impl/BlockAwareSegmentInputStreamImpl.java     |  7 +++--
 .../impl/BlobStoreManagedLedgerOffloaderTest.java  |  9 +++---
 11 files changed, 135 insertions(+), 37 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
index 5f03c60ece9..d8acedd22a9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
@@ -329,6 +329,42 @@ public class TopicName implements ServiceUnitId {
         }
     }
 
+    /**
+     * get topic full name from managedLedgerName.
+     *
+     * @return the topic full name, format -> domain://tenant/namespace/topic
+     */
+    public static String fromPersistenceNamingEncoding(String mlName) {
+        // The managedLedgerName convention is: tenant/namespace/domain/topic
+        // We want to transform to topic full name in the order: 
domain://tenant/namespace/topic
+        if (mlName == null || mlName.length() == 0) {
+            return mlName;
+        }
+        List<String> parts = Splitter.on("/").splitToList(mlName);
+        String tenant;
+        String cluster;
+        String namespacePortion;
+        String domain;
+        String localName;
+        if (parts.size() == 4) {
+            tenant = parts.get(0);
+            cluster = null;
+            namespacePortion = parts.get(1);
+            domain = parts.get(2);
+            localName = parts.get(3);
+            return String.format("%s://%s/%s/%s", domain, tenant, 
namespacePortion, localName);
+        } else if (parts.size() == 5) {
+            tenant = parts.get(0);
+            cluster = parts.get(1);
+            namespacePortion = parts.get(2);
+            domain = parts.get(3);
+            localName = parts.get(4);
+            return String.format("%s://%s/%s/%s/%s", domain, tenant, cluster, 
namespacePortion, localName);
+        } else {
+            throw new IllegalArgumentException("Invalid managedLedger name: " 
+ mlName);
+        }
+    }
+
     /**
      * Get a string suitable for completeTopicName lookup.
      *
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
index 6107f5d4042..11adad7e7c7 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
@@ -236,6 +236,40 @@ public class TopicNameTest {
         assertEquals(name.getPersistenceNamingEncoding(), 
"prop/colo/ns/persistent/" + encodedName);
     }
 
+    @Test
+    public void testFromPersistenceNamingEncoding() {
+        // case1: V2
+        String mlName1 = 
"public_tenant/default_namespace/persistent/test_topic";
+        String expectedTopicName1 = 
"persistent://public_tenant/default_namespace/test_topic";
+
+        TopicName name1 = TopicName.get(expectedTopicName1);
+        assertEquals(name1.getPersistenceNamingEncoding(), mlName1);
+        assertEquals(TopicName.fromPersistenceNamingEncoding(mlName1), 
expectedTopicName1);
+
+        // case2: V1
+        String mlName2 = 
"public_tenant/my_cluster/default_namespace/persistent/test_topic";
+        String expectedTopicName2 = 
"persistent://public_tenant/my_cluster/default_namespace/test_topic";
+
+        TopicName name2 = TopicName.get(expectedTopicName2);
+        assertEquals(name2.getPersistenceNamingEncoding(), mlName2);
+        assertEquals(TopicName.fromPersistenceNamingEncoding(mlName2), 
expectedTopicName2);
+
+        // case3: null
+        String mlName3 = "";
+        String expectedTopicName3 = "";
+        assertEquals(expectedTopicName3, 
TopicName.fromPersistenceNamingEncoding(mlName3));
+
+        // case4: Invalid name
+        try {
+            String mlName4 = 
"public_tenant/my_cluster/default_namespace/persistent/test_topic/sub_topic";
+            TopicName.fromPersistenceNamingEncoding(mlName4);
+            fail("Should have raised exception");
+        } catch (IllegalArgumentException e) {
+            // Exception is expected.
+        }
+    }
+
+
     @SuppressWarnings("deprecation")
     @Test
     public void testTopicNameWithoutCluster() throws Exception {
diff --git 
a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
 
b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
index 5d10b63de34..f31ce56b603 100644
--- 
a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
+++ 
b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
@@ -40,6 +40,7 @@ import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.MapFile;
+import org.apache.pulsar.common.naming.TopicName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,6 +52,7 @@ public class FileStoreBackedReadHandleImpl implements 
ReadHandle {
     private final LedgerMetadata ledgerMetadata;
     private final LedgerOffloaderStats offloaderStats;
     private final String managedLedgerName;
+    private final String topicName;
 
     private FileStoreBackedReadHandleImpl(ExecutorService executor, 
MapFile.Reader reader, long ledgerId,
                                           LedgerOffloaderStats offloaderStats,
@@ -60,13 +62,14 @@ public class FileStoreBackedReadHandleImpl implements 
ReadHandle {
         this.reader = reader;
         this.offloaderStats = offloaderStats;
         this.managedLedgerName = managedLedgerName;
+        this.topicName = 
TopicName.fromPersistenceNamingEncoding(managedLedgerName);
         LongWritable key = new LongWritable();
         BytesWritable value = new BytesWritable();
         try {
             key.set(FileSystemManagedLedgerOffloader.METADATA_KEY_INDEX);
             long startReadIndexTime = System.nanoTime();
             reader.get(key, value);
-            offloaderStats.recordReadOffloadIndexLatency(managedLedgerName,
+            offloaderStats.recordReadOffloadIndexLatency(topicName,
                     System.nanoTime() - startReadIndexTime, 
TimeUnit.NANOSECONDS);
             this.ledgerMetadata = parseLedgerMetadata(ledgerId, 
value.copyBytes());
         } catch (IOException e) {
@@ -125,7 +128,7 @@ public class FileStoreBackedReadHandleImpl implements 
ReadHandle {
                 while (entriesToRead > 0) {
                     long startReadTime = System.nanoTime();
                     reader.next(key, value);
-                    
this.offloaderStats.recordReadOffloadDataLatency(managedLedgerName,
+                    this.offloaderStats.recordReadOffloadDataLatency(topicName,
                             System.nanoTime() - startReadTime, 
TimeUnit.NANOSECONDS);
                     int length = value.getLength();
                     long entryId = key.get();
@@ -135,7 +138,7 @@ public class FileStoreBackedReadHandleImpl implements 
ReadHandle {
                         buf.writeBytes(value.copyBytes());
                         entriesToRead--;
                         nextExpectedId++;
-                        
this.offloaderStats.recordReadOffloadBytes(managedLedgerName, length);
+                        this.offloaderStats.recordReadOffloadBytes(topicName, 
length);
                     } else if (entryId > lastEntry) {
                         log.info("Expected to read {}, but read {}, which is 
greater than last entry {}",
                                 nextExpectedId, entryId, lastEntry);
@@ -144,7 +147,7 @@ public class FileStoreBackedReadHandleImpl implements 
ReadHandle {
             }
                 promise.complete(LedgerEntriesImpl.create(entries));
             } catch (Throwable t) {
-                this.offloaderStats.recordReadOffloadError(managedLedgerName);
+                this.offloaderStats.recordReadOffloadError(topicName);
                 promise.completeExceptionally(t);
                 entries.forEach(LedgerEntry::close);
             }
diff --git 
a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
 
b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
index 8e87c230adb..030b8c83f06 100644
--- 
a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
+++ 
b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.MapFile;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -197,9 +198,10 @@ public class FileSystemManagedLedgerOffloader implements 
LedgerOffloader {
                 return;
             }
             long ledgerId = readHandle.getId();
-            final String topicName = extraMetadata.get(MANAGED_LEDGER_NAME);
-            String storagePath = getStoragePath(storageBasePath, topicName);
+            final String managedLedgerName = 
extraMetadata.get(MANAGED_LEDGER_NAME);
+            String storagePath = getStoragePath(storageBasePath, 
managedLedgerName);
             String dataFilePath = getDataFilePath(storagePath, ledgerId, uuid);
+            final String topicName = 
TopicName.fromPersistenceNamingEncoding(managedLedgerName);
             LongWritable key = new LongWritable();
             BytesWritable value = new BytesWritable();
             try {
@@ -241,7 +243,7 @@ public class FileSystemManagedLedgerOffloader implements 
LedgerOffloader {
                 promise.complete(null);
             } catch (Exception e) {
                 log.error("Exception when get CompletableFuture<LedgerEntries> 
: ManagerLedgerName: {}, "
-                        + "LedgerId: {}, UUID: {} ", topicName, ledgerId, 
uuid, e);
+                        + "LedgerId: {}, UUID: {} ", managedLedgerName, 
ledgerId, uuid, e);
                 if (e instanceof InterruptedException) {
                     Thread.currentThread().interrupt();
                 }
@@ -306,22 +308,27 @@ public class FileSystemManagedLedgerOffloader implements 
LedgerOffloader {
         @Override
         public void run() {
             String managedLedgerName = 
ledgerReader.extraMetadata.get(MANAGED_LEDGER_NAME);
+            String topicName = 
TopicName.fromPersistenceNamingEncoding(managedLedgerName);
             if (ledgerReader.fileSystemWriteException == null) {
                 Iterator<LedgerEntry> iterator = ledgerEntriesOnce.iterator();
                 while (iterator.hasNext()) {
                     LedgerEntry entry = iterator.next();
                     long entryId = entry.getEntryId();
                     key.set(entryId);
+                    byte[] currentEntryBytes;
+                    int currentEntrySize;
                     try {
-                        value.set(entry.getEntryBytes(), 0, 
entry.getEntryBytes().length);
+                        currentEntryBytes = entry.getEntryBytes();
+                        currentEntrySize = currentEntryBytes.length;
+                        value.set(currentEntryBytes, 0, currentEntrySize);
                         dataWriter.append(key, value);
                     } catch (IOException e) {
                         ledgerReader.fileSystemWriteException = e;
-                        
ledgerReader.offloaderStats.recordWriteToStorageError(managedLedgerName);
+                        
ledgerReader.offloaderStats.recordWriteToStorageError(topicName);
                         break;
                     }
                     haveOffloadEntryNumber.incrementAndGet();
-                    
ledgerReader.offloaderStats.recordOffloadBytes(managedLedgerName, 
entry.getLength());
+                    ledgerReader.offloaderStats.recordOffloadBytes(topicName, 
currentEntrySize);
                 }
             }
             countDownLatch.countDown();
@@ -367,6 +374,7 @@ public class FileSystemManagedLedgerOffloader implements 
LedgerOffloader {
         String ledgerName = offloadDriverMetadata.get(MANAGED_LEDGER_NAME);
         String storagePath = getStoragePath(storageBasePath, ledgerName);
         String dataFilePath = getDataFilePath(storagePath, ledgerId, uid);
+        String topicName = TopicName.fromPersistenceNamingEncoding(ledgerName);
         CompletableFuture<Void> promise = new CompletableFuture<>();
         try {
             fileSystem.delete(new Path(dataFilePath), true);
@@ -376,7 +384,7 @@ public class FileSystemManagedLedgerOffloader implements 
LedgerOffloader {
             promise.completeExceptionally(e);
         }
         return promise.whenComplete((__, t) ->
-                this.offloaderStats.recordDeleteOffloadOps(ledgerName, t == 
null));
+                this.offloaderStats.recordDeleteOffloadOps(topicName, t == 
null));
     }
 
     @Override
diff --git 
a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
 
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
index 3f9dbb35551..8dd25953bb2 100644
--- 
a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
+++ 
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
@@ -32,6 +32,7 @@ import 
org.apache.bookkeeper.mledger.offload.filesystem.FileStoreTestBase;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.pulsar.common.naming.TopicName;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 import java.net.URI;
@@ -46,8 +47,9 @@ import static org.testng.Assert.assertTrue;
 
 public class FileSystemManagedLedgerOffloaderTest extends FileStoreTestBase {
     private final PulsarMockBookKeeper bk;
-    private String topic = "public/default/testOffload";
-    private String storagePath = createStoragePath(topic);
+    private String managedLedgerName = "public/default/persistent/testOffload";
+    private String topicName = 
TopicName.fromPersistenceNamingEncoding(managedLedgerName);
+    private String storagePath = createStoragePath(managedLedgerName);
     private LedgerHandle lh;
     private ReadHandle toWrite;
     private final int numberOfEntries = 601;
@@ -56,7 +58,7 @@ public class FileSystemManagedLedgerOffloaderTest extends 
FileStoreTestBase {
     public FileSystemManagedLedgerOffloaderTest() throws Exception {
         this.bk = new PulsarMockBookKeeper(scheduler);
         this.toWrite = buildReadHandle();
-        map.put("ManagedLedgerName", topic);
+        map.put("ManagedLedgerName", managedLedgerName);
     }
 
     private ReadHandle buildReadHandle() throws Exception {
@@ -125,10 +127,10 @@ public class FileSystemManagedLedgerOffloaderTest extends 
FileStoreTestBase {
         offloader.offload(toWrite, uuid, map).get();
 
         LedgerOffloaderStatsImpl offloaderStats = (LedgerOffloaderStatsImpl) 
this.offloaderStats;
-        assertTrue(offloaderStats.getOffloadError(topic) == 0);
-        assertTrue(offloaderStats.getOffloadBytes(topic) > 0);
-        assertTrue(offloaderStats.getReadLedgerLatency(topic).count > 0);
-        assertTrue(offloaderStats.getWriteStorageError(topic) == 0);
+        assertTrue(offloaderStats.getOffloadError(topicName) == 0);
+        assertTrue(offloaderStats.getOffloadBytes(topicName) > 0);
+        assertTrue(offloaderStats.getReadLedgerLatency(topicName).count > 0);
+        assertTrue(offloaderStats.getWriteStorageError(topicName) == 0);
 
         ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, 
map).get();
         LedgerEntries toTestEntries = toTest.read(0, numberOfEntries - 1);
@@ -137,10 +139,10 @@ public class FileSystemManagedLedgerOffloaderTest extends 
FileStoreTestBase {
             LedgerEntry toTestEntry = toTestIter.next();
         }
 
-        assertTrue(offloaderStats.getReadOffloadError(topic) == 0);
-        assertTrue(offloaderStats.getReadOffloadBytes(topic) > 0);
-        assertTrue(offloaderStats.getReadOffloadDataLatency(topic).count > 0);
-        assertTrue(offloaderStats.getReadOffloadIndexLatency(topic).count > 0);
+        assertTrue(offloaderStats.getReadOffloadError(topicName) == 0);
+        assertTrue(offloaderStats.getReadOffloadBytes(topicName) > 0);
+        assertTrue(offloaderStats.getReadOffloadDataLatency(topicName).count > 
0);
+        assertTrue(offloaderStats.getReadOffloadIndexLatency(topicName).count 
> 0);
     }
 
     @Test
diff --git 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
index f371e7d9a1a..311568b43da 100644
--- 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
+++ 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
@@ -26,6 +26,7 @@ import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
 import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
 import 
org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.naming.TopicName;
 import org.jclouds.blobstore.BlobStore;
 import org.jclouds.blobstore.domain.Blob;
 import org.jclouds.blobstore.options.GetOptions;
@@ -44,6 +45,7 @@ public class BlobStoreBackedInputStreamImpl extends 
BackedInputStream {
     private final int bufferSize;
     private LedgerOffloaderStats offloaderStats;
     private String managedLedgerName;
+    private String topicName;
 
     private long cursor;
     private long bufferOffsetStart;
@@ -71,6 +73,7 @@ public class BlobStoreBackedInputStreamImpl extends 
BackedInputStream {
         this(blobStore, bucket, key, versionCheck, objectLen, bufferSize);
         this.offloaderStats = offloaderStats;
         this.managedLedgerName = managedLedgerName;
+        this.topicName = 
TopicName.fromPersistenceNamingEncoding(managedLedgerName);
     }
 
     /**
@@ -110,13 +113,13 @@ public class BlobStoreBackedInputStreamImpl extends 
BackedInputStream {
                 // because JClouds streams the content
                 // and actually the HTTP call finishes when the stream is 
fully read
                 if (this.offloaderStats != null) {
-                    
this.offloaderStats.recordReadOffloadDataLatency(managedLedgerName,
+                    this.offloaderStats.recordReadOffloadDataLatency(topicName,
                             System.nanoTime() - startReadTime, 
TimeUnit.NANOSECONDS);
-                    
this.offloaderStats.recordReadOffloadBytes(managedLedgerName, endRange - 
startRange + 1);
+                    this.offloaderStats.recordReadOffloadBytes(topicName, 
endRange - startRange + 1);
                 }
             } catch (Throwable e) {
                 if (null != this.offloaderStats) {
-                    
this.offloaderStats.recordReadOffloadError(this.managedLedgerName);
+                    this.offloaderStats.recordReadOffloadError(this.topicName);
                 }
                 throw new IOException("Error reading from BlobStore", e);
             }
diff --git 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
index 499084ab9cd..f1613d83240 100644
--- 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
+++ 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
@@ -45,6 +45,7 @@ import 
org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
 import 
org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.naming.TopicName;
 import org.jclouds.blobstore.BlobStore;
 import org.jclouds.blobstore.domain.Blob;
 import org.slf4j.Logger;
@@ -261,6 +262,7 @@ public class BlobStoreBackedReadHandleImpl implements 
ReadHandle {
         int retryCount = 3;
         OffloadIndexBlock index = null;
         IOException lastException = null;
+        String topicName = 
TopicName.fromPersistenceNamingEncoding(managedLedgerName);
         // The following retry is used to avoid to some network issue cause 
read index file failure.
         // If it can not recovery in the retry, we will throw the exception 
and the dispatcher will schedule to
         // next read.
@@ -269,7 +271,7 @@ public class BlobStoreBackedReadHandleImpl implements 
ReadHandle {
         while (retryCount-- > 0) {
             long readIndexStartTime = System.nanoTime();
             Blob blob = blobStore.getBlob(bucket, indexKey);
-            offloaderStats.recordReadOffloadIndexLatency(managedLedgerName,
+            offloaderStats.recordReadOffloadIndexLatency(topicName,
                     System.nanoTime() - readIndexStartTime, 
TimeUnit.NANOSECONDS);
             versionCheck.check(indexKey, blob);
             OffloadIndexBlockBuilder indexBuilder = 
OffloadIndexBlockBuilder.create();
diff --git 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
index 495a6e2fcb3..1f22cd31e16 100644
--- 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
+++ 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
@@ -46,6 +46,7 @@ import 
org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2Builder;
 import 
org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.naming.TopicName;
 import org.jclouds.blobstore.BlobStore;
 import org.jclouds.blobstore.domain.Blob;
 import org.slf4j.Logger;
@@ -297,13 +298,14 @@ public class BlobStoreBackedReadHandleImplV2 implements 
ReadHandle {
             throws IOException {
         List<BackedInputStream> inputStreams = new LinkedList<>();
         List<OffloadIndexBlockV2> indice = new LinkedList<>();
+        String topicName = 
TopicName.fromPersistenceNamingEncoding(managedLedgerName);
         for (int i = 0; i < indexKeys.size(); i++) {
             String indexKey = indexKeys.get(i);
             String key = keys.get(i);
             log.debug("open bucket: {} index key: {}", bucket, indexKey);
             long startTime = System.nanoTime();
             Blob blob = blobStore.getBlob(bucket, indexKey);
-            offloaderStats.recordReadOffloadIndexLatency(managedLedgerName,
+            offloaderStats.recordReadOffloadIndexLatency(topicName,
                     System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
             log.debug("indexKey blob: {} {}", indexKey, blob);
             versionCheck.check(indexKey, blob);
diff --git 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
index 606934b5f0e..e6da66a4e8c 100644
--- 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
+++ 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
@@ -63,6 +63,7 @@ import 
org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2Builder;
 import org.apache.bookkeeper.mledger.offload.jcloud.provider.BlobStoreLocation;
 import 
org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.jclouds.blobstore.BlobStore;
 import org.jclouds.blobstore.domain.Blob;
@@ -176,7 +177,8 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
     public CompletableFuture<Void> offload(ReadHandle readHandle,
                                            UUID uuid,
                                            Map<String, String> extraMetadata) {
-        final String topicName = extraMetadata.get(MANAGED_LEDGER_NAME);
+        final String managedLedgerName = 
extraMetadata.get(MANAGED_LEDGER_NAME);
+        final String topicName = 
TopicName.fromPersistenceNamingEncoding(managedLedgerName);
         final BlobStore writeBlobStore = 
blobStores.get(config.getBlobStoreLocation());
         log.info("offload {} uuid {} extraMetadata {} to {} {}", 
readHandle.getId(), uuid, extraMetadata,
                 config.getBlobStoreLocation(), writeBlobStore);
@@ -226,7 +228,7 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
                         .calculateBlockSize(config.getMaxBlockSizeInBytes(), 
readHandle, startEntry, entryBytesWritten);
 
                     try (BlockAwareSegmentInputStream blockStream = new 
BlockAwareSegmentInputStreamImpl(
-                            readHandle, startEntry, blockSize, 
this.offloaderStats, topicName)) {
+                            readHandle, startEntry, blockSize, 
this.offloaderStats, managedLedgerName)) {
 
                         Payload partPayload = 
Payloads.newInputStreamPayload(blockStream);
                         
partPayload.getContentMetadata().setContentLength((long) blockSize);
@@ -611,7 +613,8 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
 
         return promise.whenComplete((__, t) -> {
             if (null != this.ml) {
-                this.offloaderStats.recordDeleteOffloadOps(this.ml.getName(), 
t == null);
+                this.offloaderStats.recordDeleteOffloadOps(
+                  TopicName.fromPersistenceNamingEncoding(this.ml.getName()), 
t == null);
             }
         });
     }
@@ -636,7 +639,8 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
         });
 
         return promise.whenComplete((__, t) ->
-                this.offloaderStats.recordDeleteOffloadOps(this.ml.getName(), 
t == null));
+                this.offloaderStats.recordDeleteOffloadOps(
+                  TopicName.fromPersistenceNamingEncoding(this.ml.getName()), 
t == null));
     }
 
     @Override
diff --git 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
index 3c92051c95d..e35debe634e 100644
--- 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
+++ 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
@@ -36,6 +36,7 @@ import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
 import 
org.apache.bookkeeper.mledger.offload.jcloud.BlockAwareSegmentInputStream;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.naming.TopicName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,6 +74,7 @@ public class BlockAwareSegmentInputStreamImpl extends 
BlockAwareSegmentInputStre
     // Keep a list of all entries ByteBuf, each ByteBuf contains 2 buf: entry 
header and entry content.
     private List<ByteBuf> entriesByteBuf = null;
     private LedgerOffloaderStats offloaderStats;
+    private String managedLedgerName;
     private String topicName;
     private int currentOffset = 0;
     private final AtomicBoolean close = new AtomicBoolean(false);
@@ -91,7 +93,8 @@ public class BlockAwareSegmentInputStreamImpl extends 
BlockAwareSegmentInputStre
                                             LedgerOffloaderStats 
offloaderStats, String ledgerName) {
         this(ledger, startEntryId, blockSize);
         this.offloaderStats = offloaderStats;
-        this.topicName = ledgerName;
+        this.managedLedgerName = ledgerName;
+        this.topicName = TopicName.fromPersistenceNamingEncoding(ledgerName);
     }
 
     private ByteBuf readEntries(int len) throws IOException {
@@ -183,7 +186,7 @@ public class BlockAwareSegmentInputStreamImpl extends 
BlockAwareSegmentInputStre
                 log.debug("read ledger entries. start: {}, end: {} cost {}", 
start, end,
                         TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - 
startTime));
             }
-            if (offloaderStats != null && topicName != null) {
+            if (offloaderStats != null && managedLedgerName != null) {
                 offloaderStats.recordReadLedgerLatency(topicName, 
System.nanoTime() - startTime,
                         TimeUnit.NANOSECONDS);
             }
diff --git 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
index 6f499d153e2..3b849252258 100644
--- 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
+++ 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -49,6 +49,7 @@ import 
org.apache.bookkeeper.mledger.impl.LedgerOffloaderStatsImpl;
 import org.apache.bookkeeper.mledger.OffloadedLedgerMetadata;
 import 
org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
 import 
org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
+import org.apache.pulsar.common.naming.TopicName;
 import org.jclouds.blobstore.BlobStore;
 import org.jclouds.blobstore.options.CopyOptions;
 import org.mockito.Mockito;
@@ -172,10 +173,10 @@ public class BlobStoreManagedLedgerOffloaderTest extends 
BlobStoreManagedLedgerO
         LedgerOffloader offloader = getOffloader();
 
         UUID uuid = UUID.randomUUID();
-
-        String topic = "test";
+        String managedLegerName = "public/default/persistent/testOffload";
+        String topic = 
TopicName.fromPersistenceNamingEncoding(managedLegerName);
         Map<String, String> extraMap = new HashMap<>();
-        extraMap.put("ManagedLedgerName", topic);
+        extraMap.put("ManagedLedgerName", managedLegerName);
         offloader.offload(toWrite, uuid, extraMap).get();
 
         LedgerOffloaderStatsImpl offloaderStats = (LedgerOffloaderStatsImpl) 
this.offloaderStats;
@@ -187,7 +188,7 @@ public class BlobStoreManagedLedgerOffloaderTest extends 
BlobStoreManagedLedgerO
 
         Map<String, String> map = new HashMap<>();
         map.putAll(offloader.getOffloadDriverMetadata());
-        map.put("ManagedLedgerName", topic);
+        map.put("ManagedLedgerName", managedLegerName);
         ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, 
map).get();
         LedgerEntries toTestEntries = toTest.read(0, 
toTest.getLastAddConfirmed());
         Iterator<LedgerEntry> toTestIter = toTestEntries.iterator();

Reply via email to