hangc0276 commented on a change in pull request #13833:
URL: https://github.com/apache/pulsar/pull/13833#discussion_r794289211



##########
File path: 
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
##########
@@ -76,6 +80,13 @@ public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, 
long startEntryId, in
         this.entriesByteBuf = Lists.newLinkedList();
     }
 
+    public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long 
startEntryId, int blockSize,
+                                            LedgerOffloaderMXBeanImpl mxBean, 
String ledgerName) {
+        this(ledger, startEntryId, blockSize);
+        this.mxBean = mxBean;
+        this.ledgerNameForMetrics = ledgerName;

Review comment:
       we'd better use `topicName` instead of `ledgerNameForMetrics`

##########
File path: 
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -218,12 +228,19 @@ public String getOffloadDriverName() {
                         }
                         entryBytesWritten += 
blockStream.getBlockEntryBytesCount();
                         partId++;
+                        
this.mxBean.recordOffloadBytes(extraMetadata.get(MANAGED_LEDGER_NAME),

Review comment:
       Please use `topicName` to store the value of 
`extraMetadata.get(MANAGED_LEDGER_NAME)`, avoiding call it multiple times.

##########
File path: 
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
##########
@@ -113,11 +124,18 @@ private int readEntries() throws IOException {
 
     private List<ByteBuf> readNextEntriesFromLedger(long start, long 
maxNumberEntries) throws IOException {
         long end = Math.min(start + maxNumberEntries - 1, 
ledger.getLastAddConfirmed());
+        long startTime = System.nanoTime();
         try (LedgerEntries ledgerEntriesOnce = ledger.readAsync(start, 
end).get()) {
-            log.debug("read ledger entries. start: {}, end: {}", start, end);
+            if (log.isDebugEnabled()) {
+                log.debug("read ledger entries. start: {}, end: {} cost {}", 
start, end,
+                        TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - 
startTime));
+            }
+            if (mxBean != null && ledgerNameForMetrics != null) {
+                mxBean.recordReadLedgerLatency(ledgerNameForMetrics, 
System.nanoTime() - startTime,
+                        TimeUnit.NANOSECONDS);
+            }
 
             List<ByteBuf> entries = Lists.newLinkedList();
-
             Iterator<LedgerEntry> iterator = ledgerEntriesOnce.iterator();

Review comment:
       We'd better record the bytes we read from BookKeeper. It contains 
prefetch entries.

##########
File path: 
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -262,6 +286,9 @@ public String getOffloadDriverName() {
                     log.error("Failed deleteObject in bucket - {} with key - 
{}.",
                             config.getBucket(), dataBlockKey, throwable);
                 }
+
+                
this.mxBean.recordWriteToStorageError(extraMetadata.get(MANAGED_LEDGER_NAME));

Review comment:
       The exception maybe thrown one the reading from BookKeeper stage, so we 
can't treat it as `recordWriteToStorageError`. The same above.

##########
File path: 
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
##########
@@ -233,7 +242,10 @@ public static ReadHandle open(ScheduledExecutorService 
executor,
         // If we use a backoff to control the retry, it will introduce a 
concurrent operation.
         // We don't want to make it complicated, because in the most of case 
it shouldn't in the retry loop.
         while (retryCount-- > 0) {
+            long readIndexStartTime = System.nanoTime();

Review comment:
       We'd better record the retry times

##########
File path: 
tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
##########
@@ -208,7 +225,10 @@ public void run() {
                     long end = Math.min(needToOffloadFirstEntryNumber + 
ENTRIES_PER_READ - 1,
                             readHandle.getLastAddConfirmed());
                     log.debug("read ledger entries. start: {}, end: {}", 
needToOffloadFirstEntryNumber, end);
+                    long startReadTime = System.nanoTime();
                     LedgerEntries ledgerEntriesOnce = 
readHandle.readAsync(needToOffloadFirstEntryNumber, end).get();
+                    
this.mxBean.recordReadLedgerLatency(extraMetadata.get(MANAGED_LEDGER_NAME),

Review comment:
       Please use `topicName` to store the value of 
`extraMetadata.get(MANAGED_LEDGER_NAME)`, avoiding call it multiple times.

##########
File path: 
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
##########
@@ -145,6 +151,7 @@ public LedgerMetadata getLedgerMetadata() {
                         }
                         entriesToRead--;
                         nextExpectedId++;
+                        this.mxBean.recordReadOffloadBytes(managedLedgerName, 
length);

Review comment:
       We'd better record the seek times use the type of counter. The total 
read latency is also needed.

##########
File path: 
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
##########
@@ -255,9 +267,9 @@ public static ReadHandle open(ScheduledExecutorService 
executor,
         BackedInputStream inputStream = new 
BlobStoreBackedInputStreamImpl(blobStore, bucket, key,
                 versionCheck,
                 index.getDataObjectLength(),
-                readBufferSize);
+                readBufferSize, mxBean, managedLedgerName);
 
-        return new BlobStoreBackedReadHandleImpl(ledgerId, index, inputStream, 
executor);
+        return new BlobStoreBackedReadHandleImpl(ledgerId, index, inputStream, 
executor,  mxBean, managedLedgerName);

Review comment:
       remove the double blank before `mxBean`

##########
File path: 
tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
##########
@@ -299,10 +324,15 @@ public void run() {
                         dataWriter.append(key, value);
                     } catch (IOException e) {
                         ledgerReader.fileSystemWriteException = e;
+                        
ledgerReader.mxBean.recordWriteToStorageError(managedLedgerName);
                         break;
                     }
                     haveOffloadEntryNumber.incrementAndGet();
+                    ledgerReader.mxBean.recordOffloadBytes(managedLedgerName, 
entry.getEntryBytes().length);

Review comment:
       Please use `entry.getLength()` instead of `entry.getEntryBytes().length`

##########
File path: 
tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
##########
@@ -299,10 +324,15 @@ public void run() {
                         dataWriter.append(key, value);
                     } catch (IOException e) {
                         ledgerReader.fileSystemWriteException = e;
+                        
ledgerReader.mxBean.recordWriteToStorageError(managedLedgerName);
                         break;
                     }
                     haveOffloadEntryNumber.incrementAndGet();
+                    ledgerReader.mxBean.recordOffloadBytes(managedLedgerName, 
entry.getEntryBytes().length);
                 }
+                long writeEntryTimeInNs = System.nanoTime() - start;
+                
ledgerReader.mxBean.recordWriteToStorageLatency(managedLedgerName, 
writeEntryTimeInNs,
+                        TimeUnit.NANOSECONDS);

Review comment:
       Should we record the bytes count of each upload batch? we can use the 
counter type

##########
File path: 
tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
##########
@@ -208,7 +225,10 @@ public void run() {
                     long end = Math.min(needToOffloadFirstEntryNumber + 
ENTRIES_PER_READ - 1,
                             readHandle.getLastAddConfirmed());
                     log.debug("read ledger entries. start: {}, end: {}", 
needToOffloadFirstEntryNumber, end);
+                    long startReadTime = System.nanoTime();
                     LedgerEntries ledgerEntriesOnce = 
readHandle.readAsync(needToOffloadFirstEntryNumber, end).get();
+                    
this.mxBean.recordReadLedgerLatency(extraMetadata.get(MANAGED_LEDGER_NAME),
+                            System.nanoTime() - startReadTime, 
TimeUnit.NANOSECONDS);

Review comment:
       Please record the read throughput from BookKeeper

##########
File path: 
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -253,7 +272,12 @@ public String getOffloadDriverName() {
                         .contentLength((long) indexStream.getStreamSize())
                     .build();
 
+                long startUploadTime = System.nanoTime();
                 writeBlobStore.putBlob(config.getBucket(), blob);
+                long cost = System.nanoTime() - startUploadTime;
+                String topicName = extraMetadata.get(MANAGED_LEDGER_NAME);
+                this.mxBean.recordOffloadTime(topicName, cost, 
TimeUnit.MILLISECONDS);

Review comment:
       The offload time should contains offload data time and offload index 
time, not record twice for data offload and index offload, otherwise, the 
offload latency will jitter all the time. 

##########
File path: 
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
##########
@@ -73,7 +87,12 @@ private boolean refillBufferIfNeeded() throws IOException {
                                      objectLen - 1);
 
             try {
+                long startReadTime = System.nanoTime();
                 Blob blob = blobStore.getBlob(bucket, key, new 
GetOptions().range(startRange, endRange));
+                if (this.mxBean != null) {
+                    this.mxBean.recordReadOffloadDataLatency(managedLedgerName,
+                            System.nanoTime() - startReadTime, 
TimeUnit.NANOSECONDS);
+                }
                 versionCheck.check(key, blob);
 
                 try (InputStream stream = blob.getPayload().openStream()) {

Review comment:
       We'd better record the bytes refilled from tiered storage.

##########
File path: 
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
##########
@@ -59,6 +63,16 @@ public BlobStoreBackedInputStreamImpl(BlobStore blobStore, 
String bucket, String
         this.bufferOffsetStart = this.bufferOffsetEnd = -1;
     }
 
+
+    public BlobStoreBackedInputStreamImpl(BlobStore blobStore, String bucket, 
String key,

Review comment:
       we'd better record the `read`  latency

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerOffloaderMXBeanImpl.java
##########
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.bookkeeper.mledger.LedgerOffloaderMXBean;
+import org.apache.bookkeeper.mledger.util.StatsBuckets;
+
+public class LedgerOffloaderMXBeanImpl implements LedgerOffloaderMXBean {
+
+    private static final int DEFAULT_SIZE = 4;
+    public static final long[] READ_ENTRY_LATENCY_BUCKETS_USEC = {500, 1_000, 
5_000, 10_000, 20_000, 50_000, 100_000,
+            200_000, 1000_000};
+
+    private final String driverName;
+
+    // offloadTimeMap record the time cost by one round offload
+    private final ConcurrentHashMap<String, LongAdder> offloadTimeMap = new 
ConcurrentHashMap<>(DEFAULT_SIZE);
+    // offloadErrorMap record error ocurred
+    private final ConcurrentHashMap<String, LongAdder> offloadErrorMap = new 
ConcurrentHashMap<>(DEFAULT_SIZE);
+    // offloadRateMap record the offload rate
+    private final ConcurrentHashMap<String, LongAdder> offloadBytesMap = new 
ConcurrentHashMap<>(DEFAULT_SIZE);
+
+
+    // readLedgerLatencyBucketsMap record the time cost by ledger read
+    private final ConcurrentHashMap<String, StatsBuckets> 
readLedgerLatencyBucketsMap = new ConcurrentHashMap<>(
+            DEFAULT_SIZE);
+    // writeToStorageLatencyBucketsMap record the time cost by write to storage
+    private final ConcurrentHashMap<String, StatsBuckets> 
writeToStorageLatencyBucketsMap = new ConcurrentHashMap<>(
+            DEFAULT_SIZE);
+    // writeToStorageErrorMap record the error occurred in write storage
+    private final ConcurrentHashMap<String, LongAdder> writeToStorageErrorMap 
= new ConcurrentHashMap<>();
+
+
+    // streamingWriteToStorageRateMap and streamingWriteToStorageErrorMap is 
for streamingOffload
+    private final ConcurrentHashMap<String, LongAdder> 
streamingWriteToStorageBytesMap = new ConcurrentHashMap<>(
+            DEFAULT_SIZE);
+    private final ConcurrentHashMap<String, LongAdder> 
streamingWriteToStorageErrorMap = new ConcurrentHashMap<>(
+            DEFAULT_SIZE);
+
+    // readOffloadIndexLatencyBucketsMap and readOffloadDataLatencyBucketsMap 
are latency metrics about index and data
+    // readOffloadDataRateMap and readOffloadErrorMap is for reading offloaded 
data
+    private final ConcurrentHashMap<String, StatsBuckets> 
readOffloadIndexLatencyBucketsMap = new ConcurrentHashMap<>(
+            DEFAULT_SIZE);
+    private final ConcurrentHashMap<String, StatsBuckets> 
readOffloadDataLatencyBucketsMap = new ConcurrentHashMap<>(
+            DEFAULT_SIZE);
+    private final ConcurrentHashMap<String, LongAdder> readOffloadDataBytesMap 
= new ConcurrentHashMap<>(DEFAULT_SIZE);
+    private final ConcurrentHashMap<String, LongAdder> readOffloadErrorMap = 
new ConcurrentHashMap<>(DEFAULT_SIZE);
+
+    public LedgerOffloaderMXBeanImpl(String driverName) {
+        this.driverName = driverName;
+    }
+
+    @Override
+    public String getDriverName() {
+        return this.driverName;
+    }
+
+    @Override
+    public long getOffloadTime(String topic) {
+        LongAdder offloadTime = this.offloadTimeMap.remove(topic);
+        return null == offloadTime ? 0L : offloadTime.sum();
+    }
+
+    @Override
+    public long getOffloadErrors(String topic) {
+        LongAdder errors = this.offloadErrorMap.remove(topic);
+        return null == errors ? 0L : errors.sum();
+    }
+
+    @Override
+    public long getOffloadBytes(String topic) {
+        LongAdder offloadBytes = this.offloadBytesMap.remove(topic);
+        return null == offloadBytes ? 0L : offloadBytes.sum();
+    }
+
+    @Override
+    public StatsBuckets getReadLedgerLatencyBuckets(String topic) {
+        StatsBuckets buckets = this.readLedgerLatencyBucketsMap.remove(topic);
+        if (null != buckets) {
+            buckets.refresh();
+        }
+        return buckets;
+    }
+
+    @Override
+    public StatsBuckets getWriteToStorageLatencyBuckets(String topic) {
+        StatsBuckets buckets = 
this.writeToStorageLatencyBucketsMap.remove(topic);
+        if (null != buckets) {
+            buckets.refresh();
+        }
+        return buckets;
+    }
+
+    @Override
+    public long getWriteToStorageErrors(String topic) {
+        LongAdder errors = this.writeToStorageErrorMap.remove(topic);
+        return null == errors ? 0L : errors.sum();
+    }
+
+    @Override
+    public long getStreamingWriteToStorageBytes(String topic) {
+        LongAdder bytes = this.streamingWriteToStorageBytesMap.remove(topic);
+        return null == bytes ? 0L : bytes.sum();
+    }
+
+    @Override
+    public long getStreamingWriteToStorageErrors(String topic) {
+        LongAdder errors = this.streamingWriteToStorageErrorMap.remove(topic);
+        return null == errors ? 0L : errors.sum();
+    }
+
+
+    @Override
+    public StatsBuckets getReadOffloadIndexLatencyBuckets(String topic) {
+        StatsBuckets buckets = 
this.readOffloadIndexLatencyBucketsMap.remove(topic);
+        if (null != buckets) {
+            buckets.refresh();
+        }
+        return buckets;
+    }
+
+    @Override
+    public StatsBuckets getReadOffloadDataLatencyBuckets(String topic) {
+        StatsBuckets buckets = 
this.readOffloadDataLatencyBucketsMap.remove(topic);
+        if (null != buckets) {
+            buckets.refresh();
+        }
+        return buckets;
+    }
+
+    @Override
+    public long getReadOffloadBytes(String topic) {
+        LongAdder bytes = this.readOffloadDataBytesMap.remove(topic);
+        return null == bytes ? 0L : bytes.sum();
+    }
+
+    @Override
+    public long getReadOffloadErrors(String topic) {
+        LongAdder errors = this.readOffloadErrorMap.remove(topic);
+        return null == errors ? 0L : errors.sum();
+    }
+
+    public void recordOffloadTime(String topicName, long time, TimeUnit unit) {
+        if (topicName == null) {
+            return;
+        }
+        LongAdder adder = offloadTimeMap.computeIfAbsent(topicName, k -> new 
LongAdder());
+        adder.add(unit.toMillis(time));
+    }
+
+
+    public void recordOffloadError(String topicName) {
+        if (topicName == null) {

Review comment:
       Use `StringUtils.isBlank(topicName)` instead of `topicName == null`. The 
same below




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to