[ 
https://issues.apache.org/jira/browse/HADOOP-17461?focusedWorklogId=792243&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-792243
 ]

ASF GitHub Bot logged work on HADOOP-17461:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Jul/22 16:48
            Start Date: 18/Jul/22 16:48
    Worklog Time Spent: 10m 
      Work Description: steveloughran commented on code in PR #4352:
URL: https://github.com/apache/hadoop/pull/4352#discussion_r920337912


##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextImpl.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics.impl;
+
+import java.lang.ref.WeakReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.impl.WeakReferenceThreadMap;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT;
+
+/**
+ * Implementing the IOStatisticsContext.
+ *
+ * A Context defined for IOStatistics collection per thread which captures
+ * each worker thread's work in FS streams and stores it in the form of
+ * IOStatisticsSnapshot if thread level aggregation is enabled else returning
+ * an instance of EmptyIOStatisticsStore for the FS. An active instance of
+ * the IOStatisticsContext can be used to collect the statistics.
+ *
+ * For the current thread the IOStatisticsSnapshot can be used as a way to
+ * move the IOStatistics data between applications using the Serializable
+ * nature of the class.
+ */
+public class IOStatisticsContextImpl implements IOStatisticsContext {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(IOStatisticsContextImpl.class);
+  private static final boolean IS_THREAD_IOSTATS_ENABLED;
+
+  /**
+   * Active IOStatistics Context containing different worker thread's
+   * statistics. Weak Reference so that it gets cleaned up during GC and we
+   * avoid any memory leak issues due to long lived references.
+   */
+  private static final WeakReferenceThreadMap<IOStatisticsContext>
+      ACTIVE_IOSTATS_CONTEXT =
+      new WeakReferenceThreadMap<>(IOStatisticsContextImpl::createNewInstance,
+          IOStatisticsContextImpl::referenceLostContext
+  );
+
+  /**
+   * Collecting IOStatistics per thread.
+   */
+  private final WeakReferenceThreadMap<IOStatisticsSnapshot>
+      threadLevelIOStatisticsMap =
+      new WeakReferenceThreadMap<>(this::getIOStatisticsSnapshotFactory,
+          this::referenceLost);
+
+  static {
+    // Work out if the current context has thread level IOStatistics enabled.
+    final Configuration configuration = new Configuration();
+    IS_THREAD_IOSTATS_ENABLED =
+        configuration.getBoolean(THREAD_LEVEL_IOSTATISTICS_ENABLED,
+            THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT);
+  }
+
+  /**
+   * Creating a new IOStatisticsContext instance for a FS to be used. If

Review Comment:
   cropped sentence at end



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextImpl.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics.impl;
+
+import java.lang.ref.WeakReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.impl.WeakReferenceThreadMap;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT;
+
+/**
+ * Implementing the IOStatisticsContext.
+ *
+ * A Context defined for IOStatistics collection per thread which captures
+ * each worker thread's work in FS streams and stores it in the form of
+ * IOStatisticsSnapshot if thread level aggregation is enabled else returning
+ * an instance of EmptyIOStatisticsStore for the FS. An active instance of
+ * the IOStatisticsContext can be used to collect the statistics.
+ *
+ * For the current thread the IOStatisticsSnapshot can be used as a way to
+ * move the IOStatistics data between applications using the Serializable
+ * nature of the class.
+ */
+public class IOStatisticsContextImpl implements IOStatisticsContext {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(IOStatisticsContextImpl.class);
+  private static final boolean IS_THREAD_IOSTATS_ENABLED;
+
+  /**
+   * Active IOStatistics Context containing different worker thread's
+   * statistics. Weak Reference so that it gets cleaned up during GC and we
+   * avoid any memory leak issues due to long lived references.
+   */
+  private static final WeakReferenceThreadMap<IOStatisticsContext>
+      ACTIVE_IOSTATS_CONTEXT =
+      new WeakReferenceThreadMap<>(IOStatisticsContextImpl::createNewInstance,
+          IOStatisticsContextImpl::referenceLostContext
+  );
+
+  /**
+   * Collecting IOStatistics per thread.
+   */
+  private final WeakReferenceThreadMap<IOStatisticsSnapshot>
+      threadLevelIOStatisticsMap =
+      new WeakReferenceThreadMap<>(this::getIOStatisticsSnapshotFactory,
+          this::referenceLost);
+
+  static {
+    // Work out if the current context has thread level IOStatistics enabled.
+    final Configuration configuration = new Configuration();
+    IS_THREAD_IOSTATS_ENABLED =
+        configuration.getBoolean(THREAD_LEVEL_IOSTATISTICS_ENABLED,
+            THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT);
+  }
+
+  /**
+   * Creating a new IOStatisticsContext instance for a FS to be used. If
+   *
+   * @param key Thread ID that represents which thread the context belongs to.
+   * @return an instance of IOStatisticsContext.
+   */
+  private static IOStatisticsContext createNewInstance(Long key) {
+    if (!IS_THREAD_IOSTATS_ENABLED) {
+      return new EmptyIOStatisticsContextImpl();
+    }
+    return new IOStatisticsContextImpl();
+  }
+
+  /**
+   * A Method to act as an IOStatisticsSnapshot factory, in a
+   * WeakReferenceThreadMap.
+   *
+   * @param key ThreadID.
+   * @return an Instance of IOStatisticsSnapshot.
+   */
+  private IOStatisticsSnapshot getIOStatisticsSnapshotFactory(Long key) {
+    return new IOStatisticsSnapshot();
+  }
+
+  /**
+   * In case of reference loss.
+   *
+   * @param key ThreadID.
+   */
+  private void referenceLost(Long key) {
+    LOG.debug("Reference lost for threadID: {}", key);
+  }
+
+  /**
+   * In case of reference loss for IOStatisticsContext.
+   *
+   * @param key ThreadID.
+   */
+  private static void referenceLostContext(Long key) {
+    LOG.debug("Reference lost for threadID for the context: {}", key);
+  }
+
+  /**
+   * Get the current thread's IOStatisticsContext instance. If no instance is
+   * present for this thread ID, create one using the factory.
+   *
+   * @return instance of IOStatisticsContext.
+   */
+  @Override
+  public IOStatisticsContext getCurrentIOStatisticsContext() {
+    return ACTIVE_IOSTATS_CONTEXT.getForCurrentThread();
+  }
+
+  /**
+   * A Method to get the IOStatisticsAggregator of the currentThread. This
+   * denotes the aggregated IOStatistics per thread.
+   *
+   * @return the instance of IOStatisticsAggregator for the current thread.
+   */
+  @Override
+  public IOStatisticsAggregator getThreadIOStatisticsAggregator() {
+    // If the current Thread ID already have an aggregator assigned, return
+    // that.
+    boolean isThreadIOStatsPresent =

Review Comment:
   use the context map and ask it for its aggregator



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java:
##########
@@ -48,7 +49,15 @@ public long currentThreadId() {
   }
 
   public V setForCurrentThread(V newVal) {
-    return put(currentThreadId(), newVal);
+    long id = currentThreadId();

Review Comment:
   do we need a precondition on a null value here?



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContext.java:
##########
@@ -20,22 +20,28 @@
 
 import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
 import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
 
 /**
  * An interface defined to capture thread-level IOStatistics by using per
- * thread context consisting of IOStatisticsSnapshot thread map for each
- * worker thread.
- * EmptyIOStatisticsSource is returned as an aggregator if this feature is
- * disabled, resulting in a no-op in aggregation.
+ * thread context.
+ * <p>
+ * The aggregator should be collected in their constructor by 
statistics-generating
+ * classes to obtain the aggregator to update <i>across all threads</i>.
+ * <p>
+ * The {@link #snapshot()} call creates a snapshot of the statistics;
+ * <p>
+ * The {@link #reset()} call resets the statistics in the current thread so
+ * that later snapshots will get the incremental data.
  */
-public interface IOStatisticsContext {
+public interface IOStatisticsContext extends IOStatisticsSource {
 
   /**
-   * Get the current thread's IOStatisticsContext.
+   * Get the IOStatisticsAggregator for the current thread.

Review Comment:
   "for the context"



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContext.java:
##########
@@ -20,22 +20,28 @@
 
 import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
 import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
 
 /**
  * An interface defined to capture thread-level IOStatistics by using per
- * thread context consisting of IOStatisticsSnapshot thread map for each
- * worker thread.
- * EmptyIOStatisticsSource is returned as an aggregator if this feature is
- * disabled, resulting in a no-op in aggregation.
+ * thread context.
+ * <p>
+ * The aggregator should be collected in their constructor by 
statistics-generating
+ * classes to obtain the aggregator to update <i>across all threads</i>.
+ * <p>
+ * The {@link #snapshot()} call creates a snapshot of the statistics;
+ * <p>
+ * The {@link #reset()} call resets the statistics in the current thread so
+ * that later snapshots will get the incremental data.
  */
-public interface IOStatisticsContext {
+public interface IOStatisticsContext extends IOStatisticsSource {
 
   /**
-   * Get the current thread's IOStatisticsContext.
+   * Get the IOStatisticsAggregator for the current thread.
    *
-   * @return instance of IOStatisticsContext for the current thread.
+   * @return return the aggregator for current thread.
    */
-  IOStatisticsContext getCurrentIOStatisticsContext();
+  IOStatisticsAggregator getAggregator();
 
   /**
    * Capture the snapshot of current thread's IOStatistics.

Review Comment:
   "of the context's"



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextImpl.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics.impl;
+
+import java.lang.ref.WeakReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.impl.WeakReferenceThreadMap;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT;
+
+/**
+ * Implementing the IOStatisticsContext.
+ *
+ * A Context defined for IOStatistics collection per thread which captures
+ * each worker thread's work in FS streams and stores it in the form of
+ * IOStatisticsSnapshot if thread level aggregation is enabled else returning
+ * an instance of EmptyIOStatisticsStore for the FS. An active instance of
+ * the IOStatisticsContext can be used to collect the statistics.
+ *
+ * For the current thread the IOStatisticsSnapshot can be used as a way to
+ * move the IOStatistics data between applications using the Serializable
+ * nature of the class.
+ */
+public class IOStatisticsContextImpl implements IOStatisticsContext {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(IOStatisticsContextImpl.class);
+  private static final boolean IS_THREAD_IOSTATS_ENABLED;
+
+  /**
+   * Active IOStatistics Context containing different worker thread's
+   * statistics. Weak Reference so that it gets cleaned up during GC and we
+   * avoid any memory leak issues due to long lived references.
+   */
+  private static final WeakReferenceThreadMap<IOStatisticsContext>
+      ACTIVE_IOSTATS_CONTEXT =
+      new WeakReferenceThreadMap<>(IOStatisticsContextImpl::createNewInstance,
+          IOStatisticsContextImpl::referenceLostContext
+  );
+
+  /**
+   * Collecting IOStatistics per thread.
+   */
+  private final WeakReferenceThreadMap<IOStatisticsSnapshot>

Review Comment:
   the context map can support this; just look it up and call 
getThreadIOStatisticsAggregator() on it



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java:
##########
@@ -66,20 +66,21 @@ public void setUp() throws Exception {
    */
   @BeforeClass
   public static void beforeClass() throws Exception {
-    currentIOStatisticsContext().resetThreadIOStatisticsForCurrentThread();
     // Do some work in constructor thread.
     S3AFileSystem fs = new S3AFileSystem();
     Configuration conf = new Configuration();
     fs.initialize(new URI(conf.get(TEST_FS_S3A_NAME)), conf);
     Path path = new Path("testConstructor");
+    IOStatisticsContextImpl ioStatisticsContext = fs.getIoStatisticsContext();

Review Comment:
   should be IOStatisticsContext



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -5231,4 +5234,13 @@ public RequestFactory getRequestFactory() {
   public boolean isCSEEnabled() {
     return isCSEEnabled;
   }
+
+  /**
+   * Get the FileSystem's IOStatisticsContext.
+   * @return the instance of IOStatisticsContextImpl.
+   */
+  @VisibleForTesting
+  public IOStatisticsContextImpl getIoStatisticsContext() {

Review Comment:
   don't cast. if tests want to cast it, and they know it is safe, they can do 
so



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java:
##########
@@ -66,6 +70,12 @@ public abstract class AbstractCommitITest extends 
AbstractS3ATestBase {
   private static final Logger LOG =
       LoggerFactory.getLogger(AbstractCommitITest.class);
 
+  /**
+   * Job statistics accrued across all test cases

Review Comment:
   needs a .



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContext.java:
##########
@@ -20,22 +20,28 @@
 
 import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
 import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
 
 /**
  * An interface defined to capture thread-level IOStatistics by using per
- * thread context consisting of IOStatisticsSnapshot thread map for each
- * worker thread.
- * EmptyIOStatisticsSource is returned as an aggregator if this feature is
- * disabled, resulting in a no-op in aggregation.
+ * thread context.
+ * <p>
+ * The aggregator should be collected in their constructor by 
statistics-generating
+ * classes to obtain the aggregator to update <i>across all threads</i>.
+ * <p>
+ * The {@link #snapshot()} call creates a snapshot of the statistics;
+ * <p>
+ * The {@link #reset()} call resets the statistics in the current thread so
+ * that later snapshots will get the incremental data.
  */
-public interface IOStatisticsContext {
+public interface IOStatisticsContext extends IOStatisticsSource {
 
   /**
-   * Get the current thread's IOStatisticsContext.
+   * Get the IOStatisticsAggregator for the current thread.
    *
-   * @return instance of IOStatisticsContext for the current thread.
+   * @return return the aggregator for current thread.

Review Comment:
   "for the context"



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextImpl.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics.impl;
+
+import java.lang.ref.WeakReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.impl.WeakReferenceThreadMap;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT;
+
+/**
+ * Implementing the IOStatisticsContext.
+ *
+ * A Context defined for IOStatistics collection per thread which captures
+ * each worker thread's work in FS streams and stores it in the form of
+ * IOStatisticsSnapshot if thread level aggregation is enabled else returning
+ * an instance of EmptyIOStatisticsStore for the FS. An active instance of
+ * the IOStatisticsContext can be used to collect the statistics.
+ *
+ * For the current thread the IOStatisticsSnapshot can be used as a way to
+ * move the IOStatistics data between applications using the Serializable
+ * nature of the class.
+ */
+public class IOStatisticsContextImpl implements IOStatisticsContext {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(IOStatisticsContextImpl.class);
+  private static final boolean IS_THREAD_IOSTATS_ENABLED;
+
+  /**
+   * Active IOStatistics Context containing different worker thread's
+   * statistics. Weak Reference so that it gets cleaned up during GC and we
+   * avoid any memory leak issues due to long lived references.
+   */
+  private static final WeakReferenceThreadMap<IOStatisticsContext>
+      ACTIVE_IOSTATS_CONTEXT =
+      new WeakReferenceThreadMap<>(IOStatisticsContextImpl::createNewInstance,
+          IOStatisticsContextImpl::referenceLostContext
+  );
+
+  /**
+   * Collecting IOStatistics per thread.
+   */
+  private final WeakReferenceThreadMap<IOStatisticsSnapshot>
+      threadLevelIOStatisticsMap =
+      new WeakReferenceThreadMap<>(this::getIOStatisticsSnapshotFactory,
+          this::referenceLost);
+
+  static {
+    // Work out if the current context has thread level IOStatistics enabled.
+    final Configuration configuration = new Configuration();
+    IS_THREAD_IOSTATS_ENABLED =
+        configuration.getBoolean(THREAD_LEVEL_IOSTATISTICS_ENABLED,
+            THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT);
+  }
+
+  /**
+   * Creating a new IOStatisticsContext instance for a FS to be used. If
+   *
+   * @param key Thread ID that represents which thread the context belongs to.
+   * @return an instance of IOStatisticsContext.
+   */
+  private static IOStatisticsContext createNewInstance(Long key) {
+    if (!IS_THREAD_IOSTATS_ENABLED) {
+      return new EmptyIOStatisticsContextImpl();
+    }
+    return new IOStatisticsContextImpl();
+  }
+
+  /**
+   * A Method to act as an IOStatisticsSnapshot factory, in a
+   * WeakReferenceThreadMap.
+   *
+   * @param key ThreadID.
+   * @return an Instance of IOStatisticsSnapshot.
+   */
+  private IOStatisticsSnapshot getIOStatisticsSnapshotFactory(Long key) {
+    return new IOStatisticsSnapshot();
+  }
+
+  /**
+   * In case of reference loss.
+   *
+   * @param key ThreadID.
+   */
+  private void referenceLost(Long key) {
+    LOG.debug("Reference lost for threadID: {}", key);
+  }
+
+  /**
+   * In case of reference loss for IOStatisticsContext.
+   *
+   * @param key ThreadID.
+   */
+  private static void referenceLostContext(Long key) {
+    LOG.debug("Reference lost for threadID for the context: {}", key);
+  }
+
+  /**
+   * Get the current thread's IOStatisticsContext instance. If no instance is
+   * present for this thread ID, create one using the factory.
+   *
+   * @return instance of IOStatisticsContext.
+   */
+  @Override
+  public IOStatisticsContext getCurrentIOStatisticsContext() {
+    return ACTIVE_IOSTATS_CONTEXT.getForCurrentThread();
+  }
+
+  /**
+   * A Method to get the IOStatisticsAggregator of the currentThread. This
+   * denotes the aggregated IOStatistics per thread.
+   *
+   * @return the instance of IOStatisticsAggregator for the current thread.
+   */
+  @Override
+  public IOStatisticsAggregator getThreadIOStatisticsAggregator() {
+    // If the current Thread ID already have an aggregator assigned, return
+    // that.
+    boolean isThreadIOStatsPresent =
+        threadLevelIOStatisticsMap.containsKey(Thread.currentThread().getId());
+    if (isThreadIOStatsPresent) {
+      return threadLevelIOStatisticsMap.getForCurrentThread();
+    }
+    LOG.debug("No thread iostats present creating it for :{}",
+        Thread.currentThread().getId());
+    // If no aggregator is defined to the thread ID, create one and assign it.
+    IOStatisticsSnapshot ioStatisticsSnapshot = new IOStatisticsSnapshot();
+    setThreadIOStatistics(ioStatisticsSnapshot);
+    return ioStatisticsSnapshot;
+  }
+
+  /**
+   * Set the IOStatisticsSnapshot for the current context for a specific
+   * thread.
+   *
+   * @param ioStatisticsSnapshot IOStatisticsAggregator instance for the
+   *                               current thread.
+   */
+  public void setThreadIOStatistics(
+      IOStatisticsSnapshot ioStatisticsSnapshot) {
+    threadLevelIOStatisticsMap.setForCurrentThread(ioStatisticsSnapshot);
+  }
+
+  /**
+   * Returns a snapshot of the current thread's IOStatistics.
+   *
+   * @return IOStatisticsSnapshot of the current thread.
+   */
+  @Override
+  public IOStatisticsSnapshot snapshot() {
+    if (threadLevelIOStatisticsMap.containsKey(getCurrentThreadID())) {
+      return threadLevelIOStatisticsMap.get(getCurrentThreadID());
+    }
+    return new IOStatisticsSnapshot();
+  }
+
+  /**
+   * Reset the thread IOStatistics for current thread.
+   */
+  @Override
+  public void reset() {
+    WeakReference<IOStatisticsSnapshot> ioStatisticsSnapshotRef =
+        threadLevelIOStatisticsMap.lookup(getCurrentThreadID());
+    if (ioStatisticsSnapshotRef != null) {
+      IOStatisticsSnapshot ioStatisticsSnapshot = 
ioStatisticsSnapshotRef.get();
+      // Get the snapshot for the current thread ID and clear it.
+      if(ioStatisticsSnapshot != null) {
+        ioStatisticsSnapshot.clear();
+      }
+    }
+  }
+
+  /**
+   * Get the current thread's ID.
+   * @return long value of the thread ID.
+   */
+  private Long getCurrentThreadID() {
+    return Thread.currentThread().getId();
+  }
+
+  /**
+   * Get thread ID specific IOStatistics values.
+   *
+   * @param testThreadId thread ID.
+   * @return IOStatistics instance.
+   */
+  @VisibleForTesting
+  public IOStatistics getThreadSpecificIOStatistics(long testThreadId) {

Review Comment:
   tests should also look at the context map; lets them test more



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContext.java:
##########
@@ -20,22 +20,28 @@
 
 import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
 import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
 
 /**
  * An interface defined to capture thread-level IOStatistics by using per
- * thread context consisting of IOStatisticsSnapshot thread map for each
- * worker thread.
- * EmptyIOStatisticsSource is returned as an aggregator if this feature is
- * disabled, resulting in a no-op in aggregation.
+ * thread context.
+ * <p>
+ * The aggregator should be collected in their constructor by 
statistics-generating
+ * classes to obtain the aggregator to update <i>across all threads</i>.
+ * <p>
+ * The {@link #snapshot()} call creates a snapshot of the statistics;
+ * <p>
+ * The {@link #reset()} call resets the statistics in the current thread so
+ * that later snapshots will get the incremental data.
  */
-public interface IOStatisticsContext {
+public interface IOStatisticsContext extends IOStatisticsSource {

Review Comment:
   all the javadocs saying "the current thread" except for the static method to 
do the lookup MUST be changed to just say ""the context"



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java:
##########
@@ -48,7 +49,15 @@ public long currentThreadId() {
   }
 
   public V setForCurrentThread(V newVal) {
-    return put(currentThreadId(), newVal);
+    long id = currentThreadId();
+
+    // if the same object is already in the map, just return it.
+    WeakReference<V> ref = lookup(id);
+    if (ref != null && ref.get() == newVal) {

Review Comment:
   if there's been a GC ref.get could be null. now, as we are doing == and not 
Object.equals that is safe.
   
   we should mention that as a comment





Issue Time Tracking
-------------------

    Worklog Id:     (was: 792243)
    Time Spent: 5.5h  (was: 5h 20m)

> Add thread-level IOStatistics Context
> -------------------------------------
>
>                 Key: HADOOP-17461
>                 URL: https://issues.apache.org/jira/browse/HADOOP-17461
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs, fs/azure, fs/s3
>    Affects Versions: 3.3.1
>            Reporter: Steve Loughran
>            Assignee: Mehakmeet Singh
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> For effective reporting of the iostatistics of individual worker threads, we 
> need a thread-level context which IO components update.
> * this contact needs to be passed in two background thread forming work on 
> behalf of a task.
> * IO Components (streams, iterators, filesystems) need to update this context 
> statistics as they perform work
> * Without double counting anything.
> I imagine a ThreadLocal IOStatisticContext which will be updated in the 
> FileSystem API Calls. This context MUST be passed into the background threads 
> used by a task, so that IO is correctly aggregated.
> I don't want streams, listIterators &c to do the updating as there is more 
> risk of double counting. However, we need to see their statistics if we want 
> to know things like "bytes discarded in backwards seeks". And I don't want to 
> be updating a shared context object on every read() call.
> If all we want is store IO (HEAD, GET, DELETE, list performance etc) then the 
> FS is sufficient. 
> If we do want the stream-specific detail, then I propose
> * caching the context in the constructor
> * updating it only in close() or unbuffer() (as we do from S3AInputStream to 
> S3AInstrumenation)
> * excluding those we know the FS already collects.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to