This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 31907c4ac59 [fix][offload] fix offload metrics error (#20366)
31907c4ac59 is described below
commit 31907c4ac59b3eec1a0008055c4c0099eed7403c
Author: YingQun Zhong <[email protected]>
AuthorDate: Wed May 31 15:58:51 2023 +0800
[fix][offload] fix offload metrics error (#20366)
---
.../org/apache/pulsar/common/naming/TopicName.java | 36 ++++++++++++++++++++++
.../apache/pulsar/common/naming/TopicNameTest.java | 34 ++++++++++++++++++++
.../impl/FileStoreBackedReadHandleImpl.java | 11 ++++---
.../impl/FileSystemManagedLedgerOffloader.java | 22 ++++++++-----
.../impl/FileSystemManagedLedgerOffloaderTest.java | 24 ++++++++-------
.../impl/BlobStoreBackedInputStreamImpl.java | 9 ++++--
.../jcloud/impl/BlobStoreBackedReadHandleImpl.java | 4 ++-
.../impl/BlobStoreBackedReadHandleImplV2.java | 4 ++-
.../impl/BlobStoreManagedLedgerOffloader.java | 12 +++++---
.../impl/BlockAwareSegmentInputStreamImpl.java | 7 +++--
.../impl/BlobStoreManagedLedgerOffloaderTest.java | 9 +++---
11 files changed, 135 insertions(+), 37 deletions(-)
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
index 5f03c60ece9..d8acedd22a9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
@@ -329,6 +329,42 @@ public class TopicName implements ServiceUnitId {
}
}
+ /**
+ * get topic full name from managedLedgerName.
+ *
+ * @return the topic full name, format -> domain://tenant/namespace/topic
+ */
+ public static String fromPersistenceNamingEncoding(String mlName) {
+ // The managedLedgerName convention is: tenant/namespace/domain/topic
+ // We want to transform to topic full name in the order:
domain://tenant/namespace/topic
+ if (mlName == null || mlName.length() == 0) {
+ return mlName;
+ }
+ List<String> parts = Splitter.on("/").splitToList(mlName);
+ String tenant;
+ String cluster;
+ String namespacePortion;
+ String domain;
+ String localName;
+ if (parts.size() == 4) {
+ tenant = parts.get(0);
+ cluster = null;
+ namespacePortion = parts.get(1);
+ domain = parts.get(2);
+ localName = parts.get(3);
+ return String.format("%s://%s/%s/%s", domain, tenant,
namespacePortion, localName);
+ } else if (parts.size() == 5) {
+ tenant = parts.get(0);
+ cluster = parts.get(1);
+ namespacePortion = parts.get(2);
+ domain = parts.get(3);
+ localName = parts.get(4);
+ return String.format("%s://%s/%s/%s/%s", domain, tenant, cluster,
namespacePortion, localName);
+ } else {
+ throw new IllegalArgumentException("Invalid managedLedger name: "
+ mlName);
+ }
+ }
+
/**
* Get a string suitable for completeTopicName lookup.
*
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
index 6107f5d4042..11adad7e7c7 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
@@ -236,6 +236,40 @@ public class TopicNameTest {
assertEquals(name.getPersistenceNamingEncoding(),
"prop/colo/ns/persistent/" + encodedName);
}
+ @Test
+ public void testFromPersistenceNamingEncoding() {
+ // case1: V2
+ String mlName1 =
"public_tenant/default_namespace/persistent/test_topic";
+ String expectedTopicName1 =
"persistent://public_tenant/default_namespace/test_topic";
+
+ TopicName name1 = TopicName.get(expectedTopicName1);
+ assertEquals(name1.getPersistenceNamingEncoding(), mlName1);
+ assertEquals(TopicName.fromPersistenceNamingEncoding(mlName1),
expectedTopicName1);
+
+ // case2: V1
+ String mlName2 =
"public_tenant/my_cluster/default_namespace/persistent/test_topic";
+ String expectedTopicName2 =
"persistent://public_tenant/my_cluster/default_namespace/test_topic";
+
+ TopicName name2 = TopicName.get(expectedTopicName2);
+ assertEquals(name2.getPersistenceNamingEncoding(), mlName2);
+ assertEquals(TopicName.fromPersistenceNamingEncoding(mlName2),
expectedTopicName2);
+
+ // case3: null
+ String mlName3 = "";
+ String expectedTopicName3 = "";
+ assertEquals(expectedTopicName3,
TopicName.fromPersistenceNamingEncoding(mlName3));
+
+ // case4: Invalid name
+ try {
+ String mlName4 =
"public_tenant/my_cluster/default_namespace/persistent/test_topic/sub_topic";
+ TopicName.fromPersistenceNamingEncoding(mlName4);
+ fail("Should have raised exception");
+ } catch (IllegalArgumentException e) {
+ // Exception is expected.
+ }
+ }
+
+
@SuppressWarnings("deprecation")
@Test
public void testTopicNameWithoutCluster() throws Exception {
diff --git
a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
index 5d10b63de34..f31ce56b603 100644
---
a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
+++
b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
@@ -40,6 +40,7 @@ import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapFile;
+import org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,6 +52,7 @@ public class FileStoreBackedReadHandleImpl implements
ReadHandle {
private final LedgerMetadata ledgerMetadata;
private final LedgerOffloaderStats offloaderStats;
private final String managedLedgerName;
+ private final String topicName;
private FileStoreBackedReadHandleImpl(ExecutorService executor,
MapFile.Reader reader, long ledgerId,
LedgerOffloaderStats offloaderStats,
@@ -60,13 +62,14 @@ public class FileStoreBackedReadHandleImpl implements
ReadHandle {
this.reader = reader;
this.offloaderStats = offloaderStats;
this.managedLedgerName = managedLedgerName;
+ this.topicName =
TopicName.fromPersistenceNamingEncoding(managedLedgerName);
LongWritable key = new LongWritable();
BytesWritable value = new BytesWritable();
try {
key.set(FileSystemManagedLedgerOffloader.METADATA_KEY_INDEX);
long startReadIndexTime = System.nanoTime();
reader.get(key, value);
- offloaderStats.recordReadOffloadIndexLatency(managedLedgerName,
+ offloaderStats.recordReadOffloadIndexLatency(topicName,
System.nanoTime() - startReadIndexTime,
TimeUnit.NANOSECONDS);
this.ledgerMetadata = parseLedgerMetadata(ledgerId,
value.copyBytes());
} catch (IOException e) {
@@ -125,7 +128,7 @@ public class FileStoreBackedReadHandleImpl implements
ReadHandle {
while (entriesToRead > 0) {
long startReadTime = System.nanoTime();
reader.next(key, value);
-
this.offloaderStats.recordReadOffloadDataLatency(managedLedgerName,
+ this.offloaderStats.recordReadOffloadDataLatency(topicName,
System.nanoTime() - startReadTime,
TimeUnit.NANOSECONDS);
int length = value.getLength();
long entryId = key.get();
@@ -135,7 +138,7 @@ public class FileStoreBackedReadHandleImpl implements
ReadHandle {
buf.writeBytes(value.copyBytes());
entriesToRead--;
nextExpectedId++;
-
this.offloaderStats.recordReadOffloadBytes(managedLedgerName, length);
+ this.offloaderStats.recordReadOffloadBytes(topicName,
length);
} else if (entryId > lastEntry) {
log.info("Expected to read {}, but read {}, which is
greater than last entry {}",
nextExpectedId, entryId, lastEntry);
@@ -144,7 +147,7 @@ public class FileStoreBackedReadHandleImpl implements
ReadHandle {
}
promise.complete(LedgerEntriesImpl.create(entries));
} catch (Throwable t) {
- this.offloaderStats.recordReadOffloadError(managedLedgerName);
+ this.offloaderStats.recordReadOffloadError(topicName);
promise.completeExceptionally(t);
entries.forEach(LedgerEntry::close);
}
diff --git
a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
index 8e87c230adb..030b8c83f06 100644
---
a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
+++
b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapFile;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -197,9 +198,10 @@ public class FileSystemManagedLedgerOffloader implements
LedgerOffloader {
return;
}
long ledgerId = readHandle.getId();
- final String topicName = extraMetadata.get(MANAGED_LEDGER_NAME);
- String storagePath = getStoragePath(storageBasePath, topicName);
+ final String managedLedgerName =
extraMetadata.get(MANAGED_LEDGER_NAME);
+ String storagePath = getStoragePath(storageBasePath,
managedLedgerName);
String dataFilePath = getDataFilePath(storagePath, ledgerId, uuid);
+ final String topicName =
TopicName.fromPersistenceNamingEncoding(managedLedgerName);
LongWritable key = new LongWritable();
BytesWritable value = new BytesWritable();
try {
@@ -241,7 +243,7 @@ public class FileSystemManagedLedgerOffloader implements
LedgerOffloader {
promise.complete(null);
} catch (Exception e) {
log.error("Exception when get CompletableFuture<LedgerEntries>
: ManagerLedgerName: {}, "
- + "LedgerId: {}, UUID: {} ", topicName, ledgerId,
uuid, e);
+ + "LedgerId: {}, UUID: {} ", managedLedgerName,
ledgerId, uuid, e);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
@@ -306,22 +308,27 @@ public class FileSystemManagedLedgerOffloader implements
LedgerOffloader {
@Override
public void run() {
String managedLedgerName =
ledgerReader.extraMetadata.get(MANAGED_LEDGER_NAME);
+ String topicName =
TopicName.fromPersistenceNamingEncoding(managedLedgerName);
if (ledgerReader.fileSystemWriteException == null) {
Iterator<LedgerEntry> iterator = ledgerEntriesOnce.iterator();
while (iterator.hasNext()) {
LedgerEntry entry = iterator.next();
long entryId = entry.getEntryId();
key.set(entryId);
+ byte[] currentEntryBytes;
+ int currentEntrySize;
try {
- value.set(entry.getEntryBytes(), 0,
entry.getEntryBytes().length);
+ currentEntryBytes = entry.getEntryBytes();
+ currentEntrySize = currentEntryBytes.length;
+ value.set(currentEntryBytes, 0, currentEntrySize);
dataWriter.append(key, value);
} catch (IOException e) {
ledgerReader.fileSystemWriteException = e;
-
ledgerReader.offloaderStats.recordWriteToStorageError(managedLedgerName);
+
ledgerReader.offloaderStats.recordWriteToStorageError(topicName);
break;
}
haveOffloadEntryNumber.incrementAndGet();
-
ledgerReader.offloaderStats.recordOffloadBytes(managedLedgerName,
entry.getLength());
+ ledgerReader.offloaderStats.recordOffloadBytes(topicName,
currentEntrySize);
}
}
countDownLatch.countDown();
@@ -367,6 +374,7 @@ public class FileSystemManagedLedgerOffloader implements
LedgerOffloader {
String ledgerName = offloadDriverMetadata.get(MANAGED_LEDGER_NAME);
String storagePath = getStoragePath(storageBasePath, ledgerName);
String dataFilePath = getDataFilePath(storagePath, ledgerId, uid);
+ String topicName = TopicName.fromPersistenceNamingEncoding(ledgerName);
CompletableFuture<Void> promise = new CompletableFuture<>();
try {
fileSystem.delete(new Path(dataFilePath), true);
@@ -376,7 +384,7 @@ public class FileSystemManagedLedgerOffloader implements
LedgerOffloader {
promise.completeExceptionally(e);
}
return promise.whenComplete((__, t) ->
- this.offloaderStats.recordDeleteOffloadOps(ledgerName, t ==
null));
+ this.offloaderStats.recordDeleteOffloadOps(topicName, t ==
null));
}
@Override
diff --git
a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
index 3f9dbb35551..8dd25953bb2 100644
---
a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
+++
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
@@ -32,6 +32,7 @@ import
org.apache.bookkeeper.mledger.offload.filesystem.FileStoreTestBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.pulsar.common.naming.TopicName;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.net.URI;
@@ -46,8 +47,9 @@ import static org.testng.Assert.assertTrue;
public class FileSystemManagedLedgerOffloaderTest extends FileStoreTestBase {
private final PulsarMockBookKeeper bk;
- private String topic = "public/default/testOffload";
- private String storagePath = createStoragePath(topic);
+ private String managedLedgerName = "public/default/persistent/testOffload";
+ private String topicName =
TopicName.fromPersistenceNamingEncoding(managedLedgerName);
+ private String storagePath = createStoragePath(managedLedgerName);
private LedgerHandle lh;
private ReadHandle toWrite;
private final int numberOfEntries = 601;
@@ -56,7 +58,7 @@ public class FileSystemManagedLedgerOffloaderTest extends
FileStoreTestBase {
public FileSystemManagedLedgerOffloaderTest() throws Exception {
this.bk = new PulsarMockBookKeeper(scheduler);
this.toWrite = buildReadHandle();
- map.put("ManagedLedgerName", topic);
+ map.put("ManagedLedgerName", managedLedgerName);
}
private ReadHandle buildReadHandle() throws Exception {
@@ -125,10 +127,10 @@ public class FileSystemManagedLedgerOffloaderTest extends
FileStoreTestBase {
offloader.offload(toWrite, uuid, map).get();
LedgerOffloaderStatsImpl offloaderStats = (LedgerOffloaderStatsImpl)
this.offloaderStats;
- assertTrue(offloaderStats.getOffloadError(topic) == 0);
- assertTrue(offloaderStats.getOffloadBytes(topic) > 0);
- assertTrue(offloaderStats.getReadLedgerLatency(topic).count > 0);
- assertTrue(offloaderStats.getWriteStorageError(topic) == 0);
+ assertTrue(offloaderStats.getOffloadError(topicName) == 0);
+ assertTrue(offloaderStats.getOffloadBytes(topicName) > 0);
+ assertTrue(offloaderStats.getReadLedgerLatency(topicName).count > 0);
+ assertTrue(offloaderStats.getWriteStorageError(topicName) == 0);
ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid,
map).get();
LedgerEntries toTestEntries = toTest.read(0, numberOfEntries - 1);
@@ -137,10 +139,10 @@ public class FileSystemManagedLedgerOffloaderTest extends
FileStoreTestBase {
LedgerEntry toTestEntry = toTestIter.next();
}
- assertTrue(offloaderStats.getReadOffloadError(topic) == 0);
- assertTrue(offloaderStats.getReadOffloadBytes(topic) > 0);
- assertTrue(offloaderStats.getReadOffloadDataLatency(topic).count > 0);
- assertTrue(offloaderStats.getReadOffloadIndexLatency(topic).count > 0);
+ assertTrue(offloaderStats.getReadOffloadError(topicName) == 0);
+ assertTrue(offloaderStats.getReadOffloadBytes(topicName) > 0);
+ assertTrue(offloaderStats.getReadOffloadDataLatency(topicName).count >
0);
+ assertTrue(offloaderStats.getReadOffloadIndexLatency(topicName).count
> 0);
}
@Test
diff --git
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
index f371e7d9a1a..311568b43da 100644
---
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
+++
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
@@ -26,6 +26,7 @@ import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
import
org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.naming.TopicName;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.options.GetOptions;
@@ -44,6 +45,7 @@ public class BlobStoreBackedInputStreamImpl extends
BackedInputStream {
private final int bufferSize;
private LedgerOffloaderStats offloaderStats;
private String managedLedgerName;
+ private String topicName;
private long cursor;
private long bufferOffsetStart;
@@ -71,6 +73,7 @@ public class BlobStoreBackedInputStreamImpl extends
BackedInputStream {
this(blobStore, bucket, key, versionCheck, objectLen, bufferSize);
this.offloaderStats = offloaderStats;
this.managedLedgerName = managedLedgerName;
+ this.topicName =
TopicName.fromPersistenceNamingEncoding(managedLedgerName);
}
/**
@@ -110,13 +113,13 @@ public class BlobStoreBackedInputStreamImpl extends
BackedInputStream {
// because JClouds streams the content
// and actually the HTTP call finishes when the stream is
fully read
if (this.offloaderStats != null) {
-
this.offloaderStats.recordReadOffloadDataLatency(managedLedgerName,
+ this.offloaderStats.recordReadOffloadDataLatency(topicName,
System.nanoTime() - startReadTime,
TimeUnit.NANOSECONDS);
-
this.offloaderStats.recordReadOffloadBytes(managedLedgerName, endRange -
startRange + 1);
+ this.offloaderStats.recordReadOffloadBytes(topicName,
endRange - startRange + 1);
}
} catch (Throwable e) {
if (null != this.offloaderStats) {
-
this.offloaderStats.recordReadOffloadError(this.managedLedgerName);
+ this.offloaderStats.recordReadOffloadError(this.topicName);
}
throw new IOException("Error reading from BlobStore", e);
}
diff --git
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
index 499084ab9cd..f1613d83240 100644
---
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
+++
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
@@ -45,6 +45,7 @@ import
org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
import
org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.naming.TopicName;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.domain.Blob;
import org.slf4j.Logger;
@@ -261,6 +262,7 @@ public class BlobStoreBackedReadHandleImpl implements
ReadHandle {
int retryCount = 3;
OffloadIndexBlock index = null;
IOException lastException = null;
+ String topicName =
TopicName.fromPersistenceNamingEncoding(managedLedgerName);
// The following retry is used to avoid to some network issue cause
read index file failure.
// If it can not recovery in the retry, we will throw the exception
and the dispatcher will schedule to
// next read.
@@ -269,7 +271,7 @@ public class BlobStoreBackedReadHandleImpl implements
ReadHandle {
while (retryCount-- > 0) {
long readIndexStartTime = System.nanoTime();
Blob blob = blobStore.getBlob(bucket, indexKey);
- offloaderStats.recordReadOffloadIndexLatency(managedLedgerName,
+ offloaderStats.recordReadOffloadIndexLatency(topicName,
System.nanoTime() - readIndexStartTime,
TimeUnit.NANOSECONDS);
versionCheck.check(indexKey, blob);
OffloadIndexBlockBuilder indexBuilder =
OffloadIndexBlockBuilder.create();
diff --git
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
index 495a6e2fcb3..1f22cd31e16 100644
---
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
+++
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
@@ -46,6 +46,7 @@ import
org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2Builder;
import
org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.naming.TopicName;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.domain.Blob;
import org.slf4j.Logger;
@@ -297,13 +298,14 @@ public class BlobStoreBackedReadHandleImplV2 implements
ReadHandle {
throws IOException {
List<BackedInputStream> inputStreams = new LinkedList<>();
List<OffloadIndexBlockV2> indice = new LinkedList<>();
+ String topicName =
TopicName.fromPersistenceNamingEncoding(managedLedgerName);
for (int i = 0; i < indexKeys.size(); i++) {
String indexKey = indexKeys.get(i);
String key = keys.get(i);
log.debug("open bucket: {} index key: {}", bucket, indexKey);
long startTime = System.nanoTime();
Blob blob = blobStore.getBlob(bucket, indexKey);
- offloaderStats.recordReadOffloadIndexLatency(managedLedgerName,
+ offloaderStats.recordReadOffloadIndexLatency(topicName,
System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
log.debug("indexKey blob: {} {}", indexKey, blob);
versionCheck.check(indexKey, blob);
diff --git
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
index 606934b5f0e..e6da66a4e8c 100644
---
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
+++
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
@@ -63,6 +63,7 @@ import
org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2Builder;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.BlobStoreLocation;
import
org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.domain.Blob;
@@ -176,7 +177,8 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
public CompletableFuture<Void> offload(ReadHandle readHandle,
UUID uuid,
Map<String, String> extraMetadata) {
- final String topicName = extraMetadata.get(MANAGED_LEDGER_NAME);
+ final String managedLedgerName =
extraMetadata.get(MANAGED_LEDGER_NAME);
+ final String topicName =
TopicName.fromPersistenceNamingEncoding(managedLedgerName);
final BlobStore writeBlobStore =
blobStores.get(config.getBlobStoreLocation());
log.info("offload {} uuid {} extraMetadata {} to {} {}",
readHandle.getId(), uuid, extraMetadata,
config.getBlobStoreLocation(), writeBlobStore);
@@ -226,7 +228,7 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
.calculateBlockSize(config.getMaxBlockSizeInBytes(),
readHandle, startEntry, entryBytesWritten);
try (BlockAwareSegmentInputStream blockStream = new
BlockAwareSegmentInputStreamImpl(
- readHandle, startEntry, blockSize,
this.offloaderStats, topicName)) {
+ readHandle, startEntry, blockSize,
this.offloaderStats, managedLedgerName)) {
Payload partPayload =
Payloads.newInputStreamPayload(blockStream);
partPayload.getContentMetadata().setContentLength((long) blockSize);
@@ -611,7 +613,8 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
return promise.whenComplete((__, t) -> {
if (null != this.ml) {
- this.offloaderStats.recordDeleteOffloadOps(this.ml.getName(),
t == null);
+ this.offloaderStats.recordDeleteOffloadOps(
+ TopicName.fromPersistenceNamingEncoding(this.ml.getName()),
t == null);
}
});
}
@@ -636,7 +639,8 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
});
return promise.whenComplete((__, t) ->
- this.offloaderStats.recordDeleteOffloadOps(this.ml.getName(),
t == null));
+ this.offloaderStats.recordDeleteOffloadOps(
+ TopicName.fromPersistenceNamingEncoding(this.ml.getName()),
t == null));
}
@Override
diff --git
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
index 3c92051c95d..e35debe634e 100644
---
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
+++
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
@@ -36,6 +36,7 @@ import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import
org.apache.bookkeeper.mledger.offload.jcloud.BlockAwareSegmentInputStream;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,6 +74,7 @@ public class BlockAwareSegmentInputStreamImpl extends
BlockAwareSegmentInputStre
// Keep a list of all entries ByteBuf, each ByteBuf contains 2 buf: entry
header and entry content.
private List<ByteBuf> entriesByteBuf = null;
private LedgerOffloaderStats offloaderStats;
+ private String managedLedgerName;
private String topicName;
private int currentOffset = 0;
private final AtomicBoolean close = new AtomicBoolean(false);
@@ -91,7 +93,8 @@ public class BlockAwareSegmentInputStreamImpl extends
BlockAwareSegmentInputStre
LedgerOffloaderStats
offloaderStats, String ledgerName) {
this(ledger, startEntryId, blockSize);
this.offloaderStats = offloaderStats;
- this.topicName = ledgerName;
+ this.managedLedgerName = ledgerName;
+ this.topicName = TopicName.fromPersistenceNamingEncoding(ledgerName);
}
private ByteBuf readEntries(int len) throws IOException {
@@ -183,7 +186,7 @@ public class BlockAwareSegmentInputStreamImpl extends
BlockAwareSegmentInputStre
log.debug("read ledger entries. start: {}, end: {} cost {}",
start, end,
TimeUnit.NANOSECONDS.toMicros(System.nanoTime() -
startTime));
}
- if (offloaderStats != null && topicName != null) {
+ if (offloaderStats != null && managedLedgerName != null) {
offloaderStats.recordReadLedgerLatency(topicName,
System.nanoTime() - startTime,
TimeUnit.NANOSECONDS);
}
diff --git
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
index 6f499d153e2..3b849252258 100644
---
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
+++
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -49,6 +49,7 @@ import
org.apache.bookkeeper.mledger.impl.LedgerOffloaderStatsImpl;
import org.apache.bookkeeper.mledger.OffloadedLedgerMetadata;
import
org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
import
org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
+import org.apache.pulsar.common.naming.TopicName;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.options.CopyOptions;
import org.mockito.Mockito;
@@ -172,10 +173,10 @@ public class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreManagedLedgerO
LedgerOffloader offloader = getOffloader();
UUID uuid = UUID.randomUUID();
-
- String topic = "test";
+ String managedLegerName = "public/default/persistent/testOffload";
+ String topic =
TopicName.fromPersistenceNamingEncoding(managedLegerName);
Map<String, String> extraMap = new HashMap<>();
- extraMap.put("ManagedLedgerName", topic);
+ extraMap.put("ManagedLedgerName", managedLegerName);
offloader.offload(toWrite, uuid, extraMap).get();
LedgerOffloaderStatsImpl offloaderStats = (LedgerOffloaderStatsImpl)
this.offloaderStats;
@@ -187,7 +188,7 @@ public class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreManagedLedgerO
Map<String, String> map = new HashMap<>();
map.putAll(offloader.getOffloadDriverMetadata());
- map.put("ManagedLedgerName", topic);
+ map.put("ManagedLedgerName", managedLegerName);
ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid,
map).get();
LedgerEntries toTestEntries = toTest.read(0,
toTest.getLastAddConfirmed());
Iterator<LedgerEntry> toTestIter = toTestEntries.iterator();