anujmodi2021 commented on code in PR #8137:
URL: https://github.com/apache/hadoop/pull/8137#discussion_r2642758374


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java:
##########
@@ -71,7 +71,14 @@ public final class ConfigurationKeys {
   public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME = 
"fs.azure.account.key";
   public static final String FS_AZURE_METRIC_ACCOUNT_NAME = 
"fs.azure.metric.account.name";
   public static final String FS_AZURE_METRIC_ACCOUNT_KEY = 
"fs.azure.metric.account.key";
-  public static final String FS_AZURE_METRIC_URI = "fs.azure.metric.uri";
+  public static final String FS_AZURE_METRIC_FORMAT = "fs.azure.metric.format";

Review Comment:
   Add jvadoc for all these with {@value} tag.
   



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java:
##########
@@ -71,7 +71,14 @@ public final class ConfigurationKeys {
   public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME = 
"fs.azure.account.key";
   public static final String FS_AZURE_METRIC_ACCOUNT_NAME = 
"fs.azure.metric.account.name";
   public static final String FS_AZURE_METRIC_ACCOUNT_KEY = 
"fs.azure.metric.account.key";
-  public static final String FS_AZURE_METRIC_URI = "fs.azure.metric.uri";
+  public static final String FS_AZURE_METRIC_FORMAT = "fs.azure.metric.format";

Review Comment:
   Keep all the metric related configs name consistent with same prefix.
   `fs.azure.metrics....`



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java:
##########
@@ -136,6 +136,13 @@ public final class FileSystemConfigurations {
   public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = false;
   public static final int DEFAULT_METRIC_IDLE_TIMEOUT_MS = 60_000;
   public static final int DEFAULT_METRIC_ANALYSIS_TIMEOUT_MS = 60_000;
+  public static final boolean DEFAULT_METRICS_COLLECTION_ENABLED = true;
+  public static final boolean DEFAULT_SHOULD_EMIT_METRICS_ON_IDLE_TIME = false;
+  public static final long DEFAULT_METRICS_EMIT_THRESHOLD = 100_000L;
+  public static final long DEFAULT_METRICS_EMIT_THRESHOLD_INTERVAL_SECS = 60;

Review Comment:
   Genrally names here also contains _FS_. Format was exact config variable 
name prefixed with DEFAULT_
   
   I know this isn't followed everywhere but let's try to resolve this here.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java:
##########
@@ -217,11 +233,13 @@ private AbfsClient(final URL baseUrl,
       final AbfsConfiguration abfsConfiguration,
       final EncryptionContextProvider encryptionContextProvider,
       final AbfsClientContext abfsClientContext,
+      final String fileSystemId,

Review Comment:
   I think its better to pass it as a part of client context similar to other 
client related fields.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadFooterMetrics.java:
##########
@@ -62,476 +63,536 @@
  * This class is responsible for tracking and updating metrics related to 
reading footers in files.
  */
 public class AbfsReadFooterMetrics extends AbstractAbfsStatisticsSource {
-    private static final Logger LOG = 
LoggerFactory.getLogger(AbfsReadFooterMetrics.class);
-    private static final String FOOTER_LENGTH = "20";
-    private static final List<FileType> FILE_TYPE_LIST =
-            Arrays.asList(FileType.values());
-    private final Map<String, FileTypeMetrics> fileTypeMetricsMap =
-            new ConcurrentHashMap<>();
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsReadFooterMetrics.class);
+
+  private static final String FOOTER_LENGTH = "20";

Review Comment:
   Add comment as to why 20?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBackoffMetrics.java:
##########
@@ -75,18 +74,24 @@
  * retry operations in Azure Blob File System (ABFS).
  */
 public class AbfsBackoffMetrics extends AbstractAbfsStatisticsSource {
-  private static final Logger LOG = 
LoggerFactory.getLogger(AbfsBackoffMetrics.class);
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsBackoffMetrics.class);
+
   private static final List<RetryValue> RETRY_LIST = Arrays.asList(
-          RetryValue.values());
+      RetryValue.values());
+
+  private final boolean isRetryMetricEnabled;

Review Comment:
   What is the purpose of this?
   Please add javadocs wherever necessary.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java:
##########
@@ -281,42 +303,84 @@ private AbfsClient(final URL baseUrl,
         new ThreadFactoryBuilder().setNameFormat("AbfsClient Lease 
Ops").setDaemon(true).build();
     this.executorService = MoreExecutors.listeningDecorator(
         
HadoopExecutors.newScheduledThreadPool(this.abfsConfiguration.getNumLeaseThreads(),
 tf));
-    this.metricFormat = abfsConfiguration.getMetricFormat();
+    
this.isMetricCollectionEnabled.set(abfsConfiguration.isMetricsCollectionEnabled());
     this.isMetricCollectionStopped = new AtomicBoolean(false);
     this.metricAnalysisPeriod = abfsConfiguration.getMetricAnalysisTimeout();
     this.metricIdlePeriod = abfsConfiguration.getMetricIdleTimeout();
-    if (StringUtils.isNotEmpty(metricFormat.toString())) {
-      String metricAccountName = abfsConfiguration.getMetricAccount();
-      String metricAccountKey = abfsConfiguration.getMetricAccountKey();
-      if (StringUtils.isNotEmpty(metricAccountName) && 
StringUtils.isNotEmpty(metricAccountKey)) {
-        isMetricCollectionEnabled = true;
-        abfsCounters.initializeMetrics(metricFormat);
-        int dotIndex = metricAccountName.indexOf(AbfsHttpConstants.DOT);
-        if (dotIndex <= 0) {
-          throw new InvalidUriException(
-              metricAccountName + " - account name is not fully qualified.");
+    if (isMetricCollectionEnabled()) {

Review Comment:
   I feel like  lot of metric related busuness logic is there in client 
constructor.
   Let's create a separate MetricsManager class for AbfsCleint which will 
encapsulate all the timers and scedulers for metrics and exposes certain public 
methods for client to call like init, close etc as needed.
   
   This way code will look much cleaner



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java:
##########
@@ -281,42 +303,84 @@ private AbfsClient(final URL baseUrl,
         new ThreadFactoryBuilder().setNameFormat("AbfsClient Lease 
Ops").setDaemon(true).build();
     this.executorService = MoreExecutors.listeningDecorator(
         
HadoopExecutors.newScheduledThreadPool(this.abfsConfiguration.getNumLeaseThreads(),
 tf));
-    this.metricFormat = abfsConfiguration.getMetricFormat();
+    
this.isMetricCollectionEnabled.set(abfsConfiguration.isMetricsCollectionEnabled());
     this.isMetricCollectionStopped = new AtomicBoolean(false);
     this.metricAnalysisPeriod = abfsConfiguration.getMetricAnalysisTimeout();
     this.metricIdlePeriod = abfsConfiguration.getMetricIdleTimeout();
-    if (StringUtils.isNotEmpty(metricFormat.toString())) {
-      String metricAccountName = abfsConfiguration.getMetricAccount();
-      String metricAccountKey = abfsConfiguration.getMetricAccountKey();
-      if (StringUtils.isNotEmpty(metricAccountName) && 
StringUtils.isNotEmpty(metricAccountKey)) {
-        isMetricCollectionEnabled = true;
-        abfsCounters.initializeMetrics(metricFormat);
-        int dotIndex = metricAccountName.indexOf(AbfsHttpConstants.DOT);
-        if (dotIndex <= 0) {
-          throw new InvalidUriException(
-              metricAccountName + " - account name is not fully qualified.");
+    if (isMetricCollectionEnabled()) {
+      try {
+        String metricAccountName = abfsConfiguration.getMetricAccount();
+        String metricAccountKey = abfsConfiguration.getMetricAccountKey();
+        this.metricFormat = abfsConfiguration.getMetricFormat();
+        abfsCounters.initializeMetrics(metricFormat, getAbfsConfiguration());
+        if (isNotEmpty(metricAccountName) && isNotEmpty(
+            metricAccountKey)) {
+          int dotIndex = metricAccountName.indexOf(AbfsHttpConstants.DOT);
+          if (dotIndex <= 0) {
+            throw new InvalidUriException(

Review Comment:
   Add a test around this exception if not already there



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java:
##########
@@ -338,7 +339,7 @@ void completeExecute(TracingContext tracingContext)
         LOG.debug("Rest operation {} failed with failureReason: {}. Retrying 
with retryCount = {}, retryPolicy: {} and sleepInterval: {}",
             operationType, failureReason, retryCount, 
retryPolicy.getAbbreviation(), retryInterval);
         if (abfsBackoffMetrics != null) {
-          updateBackoffTimeMetrics(retryCount, sleepDuration);
+          updateBackoffTimeMetrics(retryCount, retryInterval);

Review Comment:
   Why this change?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java:
##########
@@ -136,6 +136,13 @@ public final class FileSystemConfigurations {
   public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = false;
   public static final int DEFAULT_METRIC_IDLE_TIMEOUT_MS = 60_000;
   public static final int DEFAULT_METRIC_ANALYSIS_TIMEOUT_MS = 60_000;
+  public static final boolean DEFAULT_METRICS_COLLECTION_ENABLED = true;

Review Comment:
   We want to keep it enabled by default for all Cxs?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java:
##########
@@ -281,42 +303,84 @@ private AbfsClient(final URL baseUrl,
         new ThreadFactoryBuilder().setNameFormat("AbfsClient Lease 
Ops").setDaemon(true).build();
     this.executorService = MoreExecutors.listeningDecorator(
         
HadoopExecutors.newScheduledThreadPool(this.abfsConfiguration.getNumLeaseThreads(),
 tf));
-    this.metricFormat = abfsConfiguration.getMetricFormat();
+    
this.isMetricCollectionEnabled.set(abfsConfiguration.isMetricsCollectionEnabled());
     this.isMetricCollectionStopped = new AtomicBoolean(false);
     this.metricAnalysisPeriod = abfsConfiguration.getMetricAnalysisTimeout();
     this.metricIdlePeriod = abfsConfiguration.getMetricIdleTimeout();
-    if (StringUtils.isNotEmpty(metricFormat.toString())) {
-      String metricAccountName = abfsConfiguration.getMetricAccount();
-      String metricAccountKey = abfsConfiguration.getMetricAccountKey();
-      if (StringUtils.isNotEmpty(metricAccountName) && 
StringUtils.isNotEmpty(metricAccountKey)) {
-        isMetricCollectionEnabled = true;
-        abfsCounters.initializeMetrics(metricFormat);
-        int dotIndex = metricAccountName.indexOf(AbfsHttpConstants.DOT);
-        if (dotIndex <= 0) {
-          throw new InvalidUriException(
-              metricAccountName + " - account name is not fully qualified.");
+    if (isMetricCollectionEnabled()) {
+      try {
+        String metricAccountName = abfsConfiguration.getMetricAccount();
+        String metricAccountKey = abfsConfiguration.getMetricAccountKey();
+        this.metricFormat = abfsConfiguration.getMetricFormat();
+        abfsCounters.initializeMetrics(metricFormat, getAbfsConfiguration());
+        if (isNotEmpty(metricAccountName) && isNotEmpty(
+            metricAccountKey)) {
+          int dotIndex = metricAccountName.indexOf(AbfsHttpConstants.DOT);
+          if (dotIndex <= 0) {
+            throw new InvalidUriException(
+                metricAccountName + " - account name is not fully qualified.");
+          }
+          try {
+            metricSharedkeyCredentials = new SharedKeyCredentials(
+                metricAccountName.substring(0, dotIndex),
+                metricAccountKey);
+            hasSeparateMetricAccount = true;
+            setMetricsUrl(metricAccountName.startsWith(HTTPS_SCHEME)
+                ? metricAccountName : HTTPS_SCHEME + COLON
+                + FORWARD_SLASH + FORWARD_SLASH + metricAccountName);
+          } catch (IllegalArgumentException e) {
+            throw new IOException(
+                "Exception while initializing metric credentials ", e);
+          }
+        } else {
+          setMetricsUrl(baseUrlString.substring(0, indexLastForwardSlash + 1));
         }
-        try {
-          metricSharedkeyCredentials = new SharedKeyCredentials(
-              metricAccountName.substring(0, dotIndex),
-              metricAccountKey);
-        } catch (IllegalArgumentException e) {
-          throw new IOException("Exception while initializing metric 
credentials ", e);
+
+        // register the client to Aggregated Metrics Manager
+        this.aggregateMetricsManager.registerClient(accountName, this);
+
+        // Metrics emitter scheduler
+        this.metricsEmitScheduler
+            = Executors.newSingleThreadScheduledExecutor();
+        // run every 1 minute to check the metrics count
+        this.metricsEmitScheduler.scheduleAtFixedRate(

Review Comment:
   There are 2 separate schedulers being added here seems like.
   Each client has its own scheduler and then the singleton metric manager 
class also has one?
   
   Is this as per design?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java:
##########
@@ -71,7 +71,14 @@ public final class ConfigurationKeys {
   public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME = 
"fs.azure.account.key";
   public static final String FS_AZURE_METRIC_ACCOUNT_NAME = 
"fs.azure.metric.account.name";
   public static final String FS_AZURE_METRIC_ACCOUNT_KEY = 
"fs.azure.metric.account.key";
-  public static final String FS_AZURE_METRIC_URI = "fs.azure.metric.uri";

Review Comment:
   Not needed anymore?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java:
##########
@@ -372,19 +436,26 @@ public AbfsClient(final URL baseUrl,
       final SASTokenProvider sasTokenProvider,
       final EncryptionContextProvider encryptionContextProvider,
       final AbfsClientContext abfsClientContext,
-      final AbfsServiceType abfsServiceType)
+      final String fileSystemId, final AbfsServiceType abfsServiceType)
       throws IOException {
     this(baseUrl, sharedKeyCredentials, abfsConfiguration,
-        encryptionContextProvider, abfsClientContext, abfsServiceType);
+        encryptionContextProvider, abfsClientContext, fileSystemId, 
abfsServiceType);
     this.sasTokenProvider = sasTokenProvider;
     this.tokenProvider = tokenProvider;
   }
 
   @Override
   public void close() throws IOException {
-    if (isMetricCollectionEnabled && runningTimerTask != null) {
-      runningTimerTask.cancel();
-      timer.cancel();
+    if (isMetricCollectionEnabled()) {

Review Comment:
   If not already done, verify that after FS close all the threads are properly 
getting shutdown and no leak is there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to