eolivelli commented on a change in pull request #13833:
URL: https://github.com/apache/pulsar/pull/13833#discussion_r839377115



##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderStats.java
##########
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+import io.prometheus.client.Summary;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.stream.Collectors;
+import org.apache.bookkeeper.common.annotation.InterfaceAudience;
+import org.apache.bookkeeper.common.annotation.InterfaceStability;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.common.naming.TopicName;
+
+
+/**
+ * Management Bean for a {@link LedgerOffloader}.
+ */
[email protected]
[email protected]
+public final class LedgerOffloaderStats implements Runnable {
+    private static final String TOPIC_LABEL = "topic";
+    private static final String NAMESPACE_LABEL = "namespace";
+    private static final String UNKNOWN = "unknown";
+
+    private final boolean exposeLedgerMetrics;
+    private final boolean exposeTopicLevelMetrics;
+    private final int interval;
+
+    private Counter offloadError;
+    private Gauge offloadRate;
+    private Summary readLedgerLatency;
+    private Counter writeStorageError;
+    private Counter readOffloadError;
+    private Gauge readOffloadRate;
+    private Summary readOffloadIndexLatency;
+    private Summary readOffloadDataLatency;
+
+    private Map<String, String> topic2Namespace;
+    private Map<String, Pair<LongAdder, LongAdder>> 
offloadAndReadOffloadBytesMap;
+
+    private static volatile LedgerOffloaderStats instance;

Review comment:
       please do not use singletons.
   I know it is harder to glue the things but one we add this Singleton we 
won't be able to easily drop it in the future

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
##########
@@ -1244,10 +1245,15 @@ public LedgerOffloader 
getManagedLedgerOffloader(NamespaceName namespaceName, Of
     public synchronized LedgerOffloader 
createManagedLedgerOffloader(OffloadPoliciesImpl offloadPolicies)
             throws PulsarServerException {
         try {
+            //initialize ledger offloader stats
+            
LedgerOffloaderStats.initialize(config.isExposeManagedLedgerMetricsInPrometheus(),
+                    config.isExposeTopicLevelMetricsInPrometheus(), 
this.executor,
+                    config.getManagedLedgerStatsPeriodSeconds());
             if 
(StringUtils.isNotBlank(offloadPolicies.getManagedLedgerOffloadDriver())) {
                 checkNotNull(offloadPolicies.getOffloadersDirectory(),
                     "Offloader driver is configured to be '%s' but no 
offloaders directory is configured.",
                         offloadPolicies.getManagedLedgerOffloadDriver());
+
                 Offloaders offloaders = offloadersCache.getOrLoadOffloaders(

Review comment:
       we can create a single instance of LedgerOffloaderStats and add a field 
in PulsarService
   
   then you can pass the LedgerOffloaderStats to Offloaders

##########
File path: 
tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
##########
@@ -105,6 +108,7 @@ private 
FileSystemManagedLedgerOffloader(OffloadPoliciesImpl conf, OrderedSchedu
         this.assignmentScheduler = OrderedScheduler.newSchedulerBuilder()
                 .numThreads(conf.getManagedLedgerOffloadMaxThreads())
                 .name("offload-assignment").build();
+        this.offloaderStats = LedgerOffloaderStats.getInstance();

Review comment:
       please pass this into the constructor 

##########
File path: 
tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
##########
@@ -125,6 +129,7 @@ public FileSystemManagedLedgerOffloader(OffloadPoliciesImpl 
conf,
         this.assignmentScheduler = OrderedScheduler.newSchedulerBuilder()
                 .numThreads(conf.getManagedLedgerOffloadMaxThreads())
                 .name("offload-assignment").build();
+        this.offloaderStats = LedgerOffloaderStats.getInstance();

Review comment:
       please pass this into the constructor

##########
File path: 
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -140,6 +144,7 @@ public static BlobStoreManagedLedgerOffloader 
create(TieredStorageConfiguration
                 config.getBucket(), config.getRegion());
 
         blobStores.putIfAbsent(config.getBlobStoreLocation(), 
config.getBlobStore());
+        this.offloaderStats = LedgerOffloaderStats.getInstance();

Review comment:
       please pass this into the constructor




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to