gaoran10 commented on a change in pull request #13833: URL: https://github.com/apache/pulsar/pull/13833#discussion_r790582048
########## File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderMXBean.java ########## @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger; + +import org.apache.bookkeeper.common.annotation.InterfaceAudience; +import org.apache.bookkeeper.common.annotation.InterfaceStability; +import org.apache.bookkeeper.mledger.util.StatsBuckets; + + +/** + * Management Bean for a {@link LedgerOffloader}. + */ [email protected] [email protected] +public interface LedgerOffloaderMXBean { + + /** + * The ledger offloader name. + * + * @return ledger offloader name. + */ + String getDriverName(); + + /** + * Record the offload total time. + * + * @return offload time per topic. + */ + long getOffloadTime(String topic); + + /** + * Record the offload error count. + * + * @return offload errors per topic. + */ + long getOffloadErrors(String topic); + + /** + * Record the offload rate to storage. + * + * @return offload rate per topic. + */ + long getOffloadBytes(String topic); Review comment: Change the name to `getOffloadRate`? ########## File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderMXBean.java ########## @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger; + +import org.apache.bookkeeper.common.annotation.InterfaceAudience; +import org.apache.bookkeeper.common.annotation.InterfaceStability; +import org.apache.bookkeeper.mledger.util.StatsBuckets; + + +/** + * Management Bean for a {@link LedgerOffloader}. + */ [email protected] [email protected] +public interface LedgerOffloaderMXBean { + + /** + * The ledger offloader name. + * + * @return ledger offloader name. + */ + String getDriverName(); + + /** + * Record the offload total time. + * + * @return offload time per topic. + */ + long getOffloadTime(String topic); + + /** + * Record the offload error count. + * + * @return offload errors per topic. + */ + long getOffloadErrors(String topic); + + /** + * Record the offload rate to storage. + * + * @return offload rate per topic. + */ + long getOffloadBytes(String topic); + + /** + * Record the read ledger latency. + * + * @return read ledger latency per topic. + */ + StatsBuckets getReadLedgerLatencyBuckets(String topic); + + /** + * Record the write latency to tiered storage. + * + * @return write to storage latency per topic. + */ + StatsBuckets getWriteToStorageLatencyBuckets(String topic); + + /** + * Record the write to storage error count. + * + * @return write to storage errors per topic. + */ + long getWriteToStorageErrors(String topic); + + /** + * Record read offload index latency. + * + * @return read offload index latency per topic. + */ + StatsBuckets getReadOffloadIndexLatencyBuckets(String topic); + + /** + * Record read offload data latency. + * + * @return read offload data latency per topic. + */ + StatsBuckets getReadOffloadDataLatencyBuckets(String topic); + + /** + * Record read offload method rate. + * + * @return read offload data rate. + */ + long getReadOffloadBytes(String topic); + + /** + * Record read offload error count. + * + * @return read offload data errors. + */ + long getReadOffloadErrors(String topic); + + /** + * Record streaming read offload method rate. + * + * @return streaming write to storage rate per topic. + */ + long getStreamingWriteToStorageBytes(String topic); + + /** + * Record streaming read offload error count. + * + * @return streaming write to storage errors per topic. + */ + long getStreamingWriteToStorageErrors(String topic); Review comment: Could we merge this with `getWriteToStorageErrors`? ########## File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderMXBean.java ########## @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger; + +import org.apache.bookkeeper.common.annotation.InterfaceAudience; +import org.apache.bookkeeper.common.annotation.InterfaceStability; +import org.apache.bookkeeper.mledger.util.StatsBuckets; + + +/** + * Management Bean for a {@link LedgerOffloader}. + */ [email protected] [email protected] +public interface LedgerOffloaderMXBean { + + /** + * The ledger offloader name. + * + * @return ledger offloader name. + */ + String getDriverName(); + + /** + * Record the offload total time. + * + * @return offload time per topic. + */ + long getOffloadTime(String topic); + + /** + * Record the offload error count. + * + * @return offload errors per topic. + */ + long getOffloadErrors(String topic); + + /** + * Record the offload rate to storage. + * + * @return offload rate per topic. + */ + long getOffloadBytes(String topic); + + /** + * Record the read ledger latency. + * + * @return read ledger latency per topic. + */ + StatsBuckets getReadLedgerLatencyBuckets(String topic); + + /** + * Record the write latency to tiered storage. + * + * @return write to storage latency per topic. + */ + StatsBuckets getWriteToStorageLatencyBuckets(String topic); + + /** + * Record the write to storage error count. + * + * @return write to storage errors per topic. + */ + long getWriteToStorageErrors(String topic); + + /** + * Record read offload index latency. + * + * @return read offload index latency per topic. + */ + StatsBuckets getReadOffloadIndexLatencyBuckets(String topic); + + /** + * Record read offload data latency. + * + * @return read offload data latency per topic. + */ + StatsBuckets getReadOffloadDataLatencyBuckets(String topic); + + /** + * Record read offload method rate. + * + * @return read offload data rate. + */ + long getReadOffloadBytes(String topic); + + /** + * Record read offload error count. + * + * @return read offload data errors. + */ + long getReadOffloadErrors(String topic); + + /** + * Record streaming read offload method rate. + * + * @return streaming write to storage rate per topic. + */ + long getStreamingWriteToStorageBytes(String topic); Review comment: Could we merge this with `getWriteToStorageLatencyBuckets`? ########## File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderMXBean.java ########## @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger; + +import org.apache.bookkeeper.common.annotation.InterfaceAudience; +import org.apache.bookkeeper.common.annotation.InterfaceStability; +import org.apache.bookkeeper.mledger.util.StatsBuckets; + + +/** + * Management Bean for a {@link LedgerOffloader}. + */ [email protected] [email protected] +public interface LedgerOffloaderMXBean { + + /** + * The ledger offloader name. Review comment: ```suggestion * The offload driver name. ``` ########## File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderMXBean.java ########## @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger; + +import org.apache.bookkeeper.common.annotation.InterfaceAudience; +import org.apache.bookkeeper.common.annotation.InterfaceStability; +import org.apache.bookkeeper.mledger.util.StatsBuckets; + + +/** + * Management Bean for a {@link LedgerOffloader}. + */ [email protected] [email protected] +public interface LedgerOffloaderMXBean { + + /** + * The ledger offloader name. + * + * @return ledger offloader name. + */ + String getDriverName(); + + /** + * Record the offload total time. + * + * @return offload time per topic. + */ + long getOffloadTime(String topic); + + /** + * Record the offload error count. + * + * @return offload errors per topic. + */ + long getOffloadErrors(String topic); + + /** + * Record the offload rate to storage. + * + * @return offload rate per topic. + */ + long getOffloadBytes(String topic); + + /** + * Record the read ledger latency. + * + * @return read ledger latency per topic. + */ + StatsBuckets getReadLedgerLatencyBuckets(String topic); + + /** + * Record the write latency to tiered storage. + * + * @return write to storage latency per topic. + */ + StatsBuckets getWriteToStorageLatencyBuckets(String topic); Review comment: Do we need to add `getWriteIndexToStorageLatencyBuckets`? ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java ########## @@ -160,6 +169,46 @@ private static void generateBrokerBasicMetrics(PulsarService pulsar, SimpleTextO clusterName, Collector.Type.GAUGE, stream); } + + private static void generateLedgerOffloaderMetrics(PulsarService pulsar, SimpleTextOutputStream stream, + boolean includeTopicMetrics) { + List<Metrics> metricList = new LinkedList<>(); + if (includeTopicMetrics) { + pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> { + bundlesMap.forEach((bundle, topicsMap) -> { + topicsMap.forEach((topicName, topic) -> { + if (topic instanceof PersistentTopic) { + try { + List<Metrics> metrics = Review comment: Could we check the ledgerOffloader of the topic first? ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java ########## @@ -200,13 +249,16 @@ private static void parseMetricsToPrometheusMetrics(Collection<Metrics> metrics, .write("{cluster=\"").write(cluster).write('"'); } + //to avoid quantile label duplicated + boolean appendedQuantile = false; Review comment: This is an existing bug, right? I'm not sure why the `quantile` label will be duplicated. ########## File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerOffloaderMXBeanImpl.java ########## @@ -0,0 +1,264 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; +import org.apache.bookkeeper.mledger.LedgerOffloaderMXBean; +import org.apache.bookkeeper.mledger.util.StatsBuckets; + +public class LedgerOffloaderMXBeanImpl implements LedgerOffloaderMXBean { + + private static final int DEFAULT_SIZE = 4; + public static final long[] READ_ENTRY_LATENCY_BUCKETS_USEC = {500, 1_000, 5_000, 10_000, 20_000, 50_000, 100_000, + 200_000, 1000_000}; + + private final String driverName; + + // offloadTimeMap record the time cost by one round offload + private final ConcurrentHashMap<String, LongAdder> offloadTimeMap = new ConcurrentHashMap<>(DEFAULT_SIZE); + // offloadErrorMap record error ocurred + private final ConcurrentHashMap<String, LongAdder> offloadErrorMap = new ConcurrentHashMap<>(DEFAULT_SIZE); + // offloadRateMap record the offload rate + private final ConcurrentHashMap<String, LongAdder> offloadBytesMap = new ConcurrentHashMap<>(DEFAULT_SIZE); + + + // readLedgerLatencyBucketsMap record the time cost by ledger read + private final ConcurrentHashMap<String, StatsBuckets> readLedgerLatencyBucketsMap = new ConcurrentHashMap<>( + DEFAULT_SIZE); + // writeToStorageLatencyBucketsMap record the time cost by write to storage + private final ConcurrentHashMap<String, StatsBuckets> writeToStorageLatencyBucketsMap = new ConcurrentHashMap<>( + DEFAULT_SIZE); + // writeToStorageErrorMap record the error occurred in write storage + private final ConcurrentHashMap<String, LongAdder> writeToStorageErrorMap = new ConcurrentHashMap<>(); + + + // streamingWriteToStorageRateMap and streamingWriteToStorageErrorMap is for streamingOffload + private final ConcurrentHashMap<String, LongAdder> streamingWriteToStorageBytesMap = new ConcurrentHashMap<>( + DEFAULT_SIZE); + private final ConcurrentHashMap<String, LongAdder> streamingWriteToStorageErrorMap = new ConcurrentHashMap<>( + DEFAULT_SIZE); + + // readOffloadIndexLatencyBucketsMap and readOffloadDataLatencyBucketsMap are latency metrics about index and data + // readOffloadDataRateMap and readOffloadErrorMap is for reading offloaded data + private final ConcurrentHashMap<String, StatsBuckets> readOffloadIndexLatencyBucketsMap = new ConcurrentHashMap<>( + DEFAULT_SIZE); + private final ConcurrentHashMap<String, StatsBuckets> readOffloadDataLatencyBucketsMap = new ConcurrentHashMap<>( + DEFAULT_SIZE); + private final ConcurrentHashMap<String, LongAdder> readOffloadDataBytesMap = new ConcurrentHashMap<>(DEFAULT_SIZE); + private final ConcurrentHashMap<String, LongAdder> readOffloadErrorMap = new ConcurrentHashMap<>(DEFAULT_SIZE); + + public LedgerOffloaderMXBeanImpl(String driverName) { + this.driverName = driverName; + } + + @Override + public String getDriverName() { + return this.driverName; + } + + @Override + public long getOffloadTime(String topic) { + LongAdder offloadTime = this.offloadTimeMap.remove(topic); + return null == offloadTime ? 0L : offloadTime.sum(); + } + + @Override + public long getOffloadErrors(String topic) { + LongAdder errors = this.offloadErrorMap.remove(topic); + return null == errors ? 0L : errors.sum(); + } + + @Override + public long getOffloadBytes(String topic) { + LongAdder offloadBytes = this.offloadBytesMap.remove(topic); + return null == offloadBytes ? 0L : offloadBytes.sum(); + } + + @Override + public StatsBuckets getReadLedgerLatencyBuckets(String topic) { + StatsBuckets buckets = this.readLedgerLatencyBucketsMap.remove(topic); + if (null != buckets) { + buckets.refresh(); + } + return buckets; + } + + @Override + public StatsBuckets getWriteToStorageLatencyBuckets(String topic) { + StatsBuckets buckets = this.writeToStorageLatencyBucketsMap.remove(topic); + if (null != buckets) { + buckets.refresh(); + } + return buckets; + } + + @Override + public long getWriteToStorageErrors(String topic) { + LongAdder errors = this.writeToStorageErrorMap.remove(topic); + return null == errors ? 0L : errors.sum(); + } + + @Override + public long getStreamingWriteToStorageBytes(String topic) { + LongAdder bytes = this.streamingWriteToStorageBytesMap.remove(topic); + return null == bytes ? 0L : bytes.sum(); + } + + @Override + public long getStreamingWriteToStorageErrors(String topic) { + LongAdder errors = this.streamingWriteToStorageErrorMap.remove(topic); + return null == errors ? 0L : errors.sum(); + } + + + @Override + public StatsBuckets getReadOffloadIndexLatencyBuckets(String topic) { + StatsBuckets buckets = this.readOffloadIndexLatencyBucketsMap.remove(topic); + if (null != buckets) { + buckets.refresh(); + } + return buckets; + } + + @Override + public StatsBuckets getReadOffloadDataLatencyBuckets(String topic) { + StatsBuckets buckets = this.readOffloadDataLatencyBucketsMap.remove(topic); + if (null != buckets) { + buckets.refresh(); + } + return buckets; + } + + @Override + public long getReadOffloadBytes(String topic) { + LongAdder bytes = this.readOffloadDataBytesMap.remove(topic); + return null == bytes ? 0L : bytes.sum(); + } + + @Override + public long getReadOffloadErrors(String topic) { + LongAdder errors = this.readOffloadErrorMap.remove(topic); + return null == errors ? 0L : errors.sum(); + } + + public void recordOffloadTime(String topicName, long time, TimeUnit unit) { + if (topicName == null) { + return; + } + LongAdder adder = offloadTimeMap.computeIfAbsent(topicName, k -> new LongAdder()); + adder.add(unit.toMillis(time)); Review comment: Do we need to add a remove method to cleanup the map after deleting the topic? ########## File path: tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java ########## @@ -218,12 +228,19 @@ public String getOffloadDriverName() { } entryBytesWritten += blockStream.getBlockEntryBytesCount(); partId++; + this.mxBean.recordOffloadBytes(extraMetadata.get(MANAGED_LEDGER_NAME), + blockStream.getBlockEntryBytesCount()); } dataObjectLength += blockSize; } + long startUploadTime = System.nanoTime(); writeBlobStore.completeMultipartUpload(mpu, parts); + this.mxBean.recordWriteToStorageLatency(extraMetadata.get(MANAGED_LEDGER_NAME), + System.nanoTime() - startUploadTime, TimeUnit.NANOSECONDS); + this.mxBean.recordOffloadTime(extraMetadata.get(MANAGED_LEDGER_NAME), + System.nanoTime() - start, TimeUnit.NANOSECONDS); Review comment: Maybe we should record once offload time after writing the index. ########## File path: tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java ########## @@ -145,6 +151,7 @@ public LedgerMetadata getLedgerMetadata() { } entriesToRead--; nextExpectedId++; + this.mxBean.recordReadOffloadBytes(managedLedgerName, length); Review comment: Maybe it's better to record the read offload bytes after `blobStore.getBlob`. ########## File path: tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java ########## @@ -218,12 +228,19 @@ public String getOffloadDriverName() { } entryBytesWritten += blockStream.getBlockEntryBytesCount(); partId++; + this.mxBean.recordOffloadBytes(extraMetadata.get(MANAGED_LEDGER_NAME), + blockStream.getBlockEntryBytesCount()); } dataObjectLength += blockSize; } + long startUploadTime = System.nanoTime(); writeBlobStore.completeMultipartUpload(mpu, parts); + this.mxBean.recordWriteToStorageLatency(extraMetadata.get(MANAGED_LEDGER_NAME), Review comment: Each part has its own upload time, right? I'm not sure they are async way or sync way, and how to describe this duration. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
