[ 
https://issues.apache.org/jira/browse/HADOOP-17461?focusedWorklogId=785213&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785213
 ]

ASF GitHub Bot logged work on HADOOP-17461:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/Jun/22 19:04
            Start Date: 27/Jun/22 19:04
    Worklog Time Spent: 10m 
      Work Description: steveloughran commented on code in PR #4352:
URL: https://github.com/apache/hadoop/pull/4352#discussion_r907678976


##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContext.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics.impl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.impl.WeakReferenceThreadMap;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT;
+
+/**
+ * Implementing the IOStatisticsContext interface.

Review Comment:
   javadocs here don't match with latest code.



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContext.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics.impl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.impl.WeakReferenceThreadMap;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT;
+
+/**
+ * Implementing the IOStatisticsContext interface.
+ *
+ * A Context defined for IOStatistics collection per thread which captures
+ * each worker thread's work in FS streams and stores it in the form of
+ * IOStatisticsAggregator which could be either IOStatisticsSnapshot or
+ * EmptyIOStatisticsStore if thread level aggregation is enabled or not for
+ * the FS. An active instance of the IOStatisticsContext can be used to
+ * collect the statistics.
+ *
+ * For the current thread the IOStatisticsSnapshot can be used as a way to
+ * move the IOStatistics data between applications using the Serializable
+ * nature of the class.
+ */
+public class IOStatisticsContext {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(IOStatisticsContext.class);
+  private static final boolean IS_THREAD_IOSTATS_ENABLED;
+
+  private static final WeakReferenceThreadMap<IOStatisticsContext>
+      ACTIVE_IOSTATS_CONTEXT = new WeakReferenceThreadMap<>(
+      IOStatisticsContext::createNewInstance,

Review Comment:
   nit, indent this and L56



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContext.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics.impl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.impl.WeakReferenceThreadMap;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT;
+
+/**
+ * Implementing the IOStatisticsContext interface.
+ *
+ * A Context defined for IOStatistics collection per thread which captures
+ * each worker thread's work in FS streams and stores it in the form of
+ * IOStatisticsAggregator which could be either IOStatisticsSnapshot or
+ * EmptyIOStatisticsStore if thread level aggregation is enabled or not for
+ * the FS. An active instance of the IOStatisticsContext can be used to
+ * collect the statistics.
+ *
+ * For the current thread the IOStatisticsSnapshot can be used as a way to
+ * move the IOStatistics data between applications using the Serializable
+ * nature of the class.
+ */
+public class IOStatisticsContext {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(IOStatisticsContext.class);
+  private static final boolean IS_THREAD_IOSTATS_ENABLED;
+
+  private static final WeakReferenceThreadMap<IOStatisticsContext>
+      ACTIVE_IOSTATS_CONTEXT = new WeakReferenceThreadMap<>(
+      IOStatisticsContext::createNewInstance,
+      IOStatisticsContext::referenceLostContext
+  );
+
+  /**
+   * Collecting IOStatistics per thread.
+   */
+  private final WeakReferenceThreadMap<IOStatisticsAggregator>
+      threadIOStatsContext = new WeakReferenceThreadMap<>(
+      this::getIOStatisticsAggregatorFactory,
+      this::referenceLost);
+
+  static {
+    // Work out if the current context have thread level IOStatistics enabled.

Review Comment:
   nit "has"



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContext.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics.impl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.impl.WeakReferenceThreadMap;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT;
+
+/**
+ * Implementing the IOStatisticsContext interface.
+ *
+ * A Context defined for IOStatistics collection per thread which captures
+ * each worker thread's work in FS streams and stores it in the form of
+ * IOStatisticsAggregator which could be either IOStatisticsSnapshot or
+ * EmptyIOStatisticsStore if thread level aggregation is enabled or not for
+ * the FS. An active instance of the IOStatisticsContext can be used to
+ * collect the statistics.
+ *
+ * For the current thread the IOStatisticsSnapshot can be used as a way to
+ * move the IOStatistics data between applications using the Serializable
+ * nature of the class.
+ */
+public class IOStatisticsContext {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(IOStatisticsContext.class);
+  private static final boolean IS_THREAD_IOSTATS_ENABLED;
+
+  private static final WeakReferenceThreadMap<IOStatisticsContext>
+      ACTIVE_IOSTATS_CONTEXT = new WeakReferenceThreadMap<>(
+      IOStatisticsContext::createNewInstance,
+      IOStatisticsContext::referenceLostContext
+  );
+
+  /**
+   * Collecting IOStatistics per thread.
+   */
+  private final WeakReferenceThreadMap<IOStatisticsAggregator>
+      threadIOStatsContext = new WeakReferenceThreadMap<>(
+      this::getIOStatisticsAggregatorFactory,
+      this::referenceLost);
+
+  static {
+    // Work out if the current context have thread level IOStatistics enabled.
+    final Configuration configuration = new Configuration();
+    IS_THREAD_IOSTATS_ENABLED =
+        configuration.getBoolean(THREAD_LEVEL_IOSTATISTICS_ENABLED,
+            THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT);
+  }
+
+  /**
+   * Creating a new IOStatisticsContext instance for a FS to be used.
+   *
+   * @param key Thread ID that represents which thread the context belongs to.
+   * @return an instance of IOStatisticsContext.
+   */
+  private static IOStatisticsContext createNewInstance(Long key) {
+    return new IOStatisticsContext();
+  }
+
+  /**
+   * Get the current IOStatisticsContext.
+   *
+   * @return current IOStatisticsContext instance.
+   */
+  public static IOStatisticsContext currentIOStatisticsContext() {
+    return ACTIVE_IOSTATS_CONTEXT.get(Thread.currentThread().getId());
+  }
+
+  /**
+   * A Method to act as an IOStatisticsSnapshot factory, in a
+   * WeakReferenceThreadMap.
+   *
+   * @param key ThreadID.
+   * @return an Instance of IOStatisticsSnapshot.
+   */
+  private IOStatisticsAggregator getIOStatisticsAggregatorFactory(Long key) {
+    return getThreadIOStatistics();
+  }
+
+  /**
+   * In case of reference loss.
+   *
+   * @param key ThreadID.
+   */
+  private void referenceLost(Long key) {
+    LOG.debug("Reference lost for threadID: {}", key);
+  }
+
+  /**
+   * In case of reference loss for IOStatisticsContext.
+   *
+   * @param key ThreadID.
+   */
+  private static void referenceLostContext(Long key) {
+    LOG.debug("Reference lost for threadID for the context: {}", key);
+  }
+
+  /**
+   * A Method to get the IOStatisticsAggregator of the currentThread. This
+   * denotes the aggregated IOStatistics per thread.
+   *
+   * @return the instance of IOStatisticsAggregator for the current thread.
+   */
+  public IOStatisticsAggregator getThreadIOStatistics() {
+    // If Thread IOStats is disabled we return an emptyIOStatistics instance
+    // back.
+    if (!IS_THREAD_IOSTATS_ENABLED) {
+      return EmptyIOStatisticsStore.getInstance();
+    }
+
+    // If the current Thread ID already have an aggregator assigned, return
+    // that.
+    boolean isThreadIOStatsPresent =
+        threadIOStatsContext.containsKey(Thread.currentThread().getId());
+    if (isThreadIOStatsPresent) {
+      return threadIOStatsContext.getForCurrentThread();
+    }
+
+    // If no aggregator is defined to the thread ID, create one and assign it.
+    IOStatisticsSnapshot ioStatisticsSnapshot = new IOStatisticsSnapshot();
+    setThreadIOStatistics(ioStatisticsSnapshot);
+    return ioStatisticsSnapshot;
+  }
+
+  /**
+   * Set the IOStatisticsAggregator for the current context for a specific
+   * thread.
+   *
+   * @param ioStatisticsAggregator IOStatisticsAggregator instance for the
+   *                               current thread.
+   */
+  public void setThreadIOStatistics(
+      IOStatisticsAggregator ioStatisticsAggregator) {
+    threadIOStatsContext.setForCurrentThread(ioStatisticsAggregator);
+  }
+
+  /**
+   * Returns a snapshot of the current thread's IOStatistics.
+   *
+   * @return IOStatisticsSnapshot of the current thread.
+   */
+  public IOStatisticsSnapshot getThreadIOStatisticsSnapshot() {
+    if (IS_THREAD_IOSTATS_ENABLED) {
+      return (IOStatisticsSnapshot) getThreadIOStatistics();
+    }
+    return new IOStatisticsSnapshot();
+  }
+
+  /**
+   * Get thread ID specific IOStatistics values.
+   *
+   * @param testThreadId thread ID.
+   * @return IOStatistics instance.
+   */
+  @VisibleForTesting
+  public IOStatistics getThreadIOStatistics(long testThreadId) {
+    LOG.info("IOStatsContext thread ID required: {}", testThreadId);

Review Comment:
   info? or debug?



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java:
##########
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static 
org.apache.hadoop.fs.statistics.impl.IOStatisticsContext.currentIOStatisticsContext;
+
+/**
+ * Tests to verify the Thread-level IOStatistics.
+ */
+public class ITestS3AIOStatisticsContext extends AbstractS3ATestBase {
+
+  private static final int SMALL_THREADS = 2;
+  private static final int BYTES_BIG = 100;
+  private static final int BYTES_SMALL = 50;
+  private static final long TEST_THREAD_ID = Thread.currentThread().getId();
+
+  /**
+   * Run this before the tests once, to note down some work in the
+   * constructor thread to be verified later on in a test.
+   */
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    // Do some work in constructor thread.
+    S3AFileSystem fs = new S3AFileSystem();
+    Configuration conf = new Configuration();
+    fs.initialize(new URI(conf.get(TEST_FS_S3A_NAME)), conf);
+    Path path = new Path("testConstructor");
+    try (FSDataOutputStream out = fs.create(path)) {
+      out.write('a');
+    }
+    try (FSDataInputStream in = fs.open(path)) {
+      in.read();
+    }
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration configuration = super.createConfiguration();
+    removeBaseAndBucketOverrides(configuration,
+        THREAD_LEVEL_IOSTATISTICS_ENABLED);
+    return configuration;
+  }
+
+  /**
+   * Verify that S3AInputStream aggregates per thread IOStats collection
+   * correctly.
+   */
+  @Test
+  public void testS3AInputStreamIOStatisticsContext()
+      throws Exception {
+    S3AFileSystem fs = getFileSystem();
+    Path path = path(getMethodName());
+    byte[] data = dataset(256, 'a', 'z');
+    byte[] readDataFirst = new byte[BYTES_BIG];
+    byte[] readDataSecond = new byte[BYTES_SMALL];
+    writeDataset(fs, path, data, data.length, 1024, true);
+
+    final ExecutorService executor =
+        HadoopExecutors.newFixedThreadPool(SMALL_THREADS);
+    CountDownLatch latch = new CountDownLatch(SMALL_THREADS);
+
+    try {
+      for (int i = 0; i < SMALL_THREADS; i++) {
+        executor.submit(() -> {
+          try {
+            IOStatistics ioStatisticsFirst;
+            try (FSDataInputStream in = fs.open(path)) {
+              in.seek(50);
+              in.read(readDataFirst);
+              in.close();
+              ioStatisticsFirst = assertThreadStatisticsBytesRead(in,
+                  BYTES_BIG);
+            }
+            // Stream is closed for a thread. Re-open and do more operations.
+            IOStatistics ioStatisticsSecond;
+            try (FSDataInputStream in = fs.open(path)) {
+              in.seek(100);
+              in.read(readDataSecond);
+              in.close();
+              ioStatisticsSecond = assertThreadStatisticsBytesRead(in,
+                  BYTES_BIG + BYTES_SMALL);
+            }
+            latch.countDown();
+          } catch (Exception e) {
+            latch.countDown();
+            setFutureException(e);
+            LOG.error("An error occurred while doing a task in the thread", e);
+          } catch (AssertionError ase) {
+            latch.countDown();
+            setFutureAse(ase);
+            throw ase;
+          }
+        });
+      }
+      // wait for tasks to finish.
+      latch.await();
+    } finally {
+      executor.shutdown();
+    }
+
+    // Check if an Excp or ASE was caught while the test threads were running.
+    maybeReThrowFutureException();
+    maybeReThrowFutureASE();
+
+  }
+
+  /**
+   * Verify that S3ABlockOutputStream aggregates per thread IOStats collection
+   * correctly.
+   */
+  @Test
+  public void testS3ABlockOutputStreamIOStatisticsContext()
+      throws Exception {
+    S3AFileSystem fs = getFileSystem();
+    Path path = path(getMethodName());
+    byte[] writeDataFirst = new byte[BYTES_BIG];
+    byte[] writeDataSecond = new byte[BYTES_SMALL];
+
+    final ExecutorService executor =
+        HadoopExecutors.newFixedThreadPool(SMALL_THREADS);
+    CountDownLatch latch = new CountDownLatch(SMALL_THREADS);
+
+    try {
+      for (int i = 0; i < SMALL_THREADS; i++) {
+        executor.submit(() -> {
+          try {
+            IOStatistics ioStatisticsFirst;
+            try (FSDataOutputStream out = fs.create(path)) {
+              out.write(writeDataFirst);
+              out.close();
+              ioStatisticsFirst = assertThreadStatisticsBytesWrite(out,
+                  BYTES_BIG);
+            }
+            // Stream is closed for a thread. Re-open and do more operations.
+            IOStatistics ioStatisticsSecond;
+            try (FSDataOutputStream out = fs.create(path)) {
+              out.write(writeDataSecond);
+              out.close();
+              ioStatisticsSecond = assertThreadStatisticsBytesWrite(out,
+                  BYTES_BIG + BYTES_SMALL);
+            }
+            latch.countDown();
+          } catch (Exception e) {
+            latch.countDown();
+            setFutureException(e);
+            LOG.error("An error occurred while doing a task in the thread", e);
+          } catch (AssertionError ase) {
+            latch.countDown();
+            setFutureAse(ase);
+            throw ase;
+          }
+        });
+      }
+      // wait for tasks to finish.
+      latch.await();
+    } finally {
+      executor.shutdown();
+    }
+
+    // Check if an Excp or ASE was caught while the test threads were running.
+    maybeReThrowFutureException();
+    maybeReThrowFutureASE();
+
+  }
+
+  /**
+   * Verify stats collection and aggregation for constructor thread, Junit
+   * thread and a worker thread.
+   */
+  @Test
+  public void testThreadIOStatisticsForDifferentThreads()
+      throws IOException, InterruptedException {
+    S3AFileSystem fs = getFileSystem();
+    Path path = path(getMethodName());
+    byte[] data = new byte[BYTES_BIG];
+    long threadIdForTest = Thread.currentThread().getId();
+
+    // Write in the Junit thread.
+    try (FSDataOutputStream out = fs.create(path)) {
+      out.write(data);
+    }
+
+    // Read in the Junit thread.
+    try (FSDataInputStream in = fs.open(path)) {
+      in.read(data);
+    }
+
+    // Worker thread work and wait for it to finish.
+    TestWorkerThread workerThread = new TestWorkerThread();
+    long workerThreadID = workerThread.getId();
+    workerThread.start();
+    workerThread.join();
+
+    // Work done in constructor: Wrote and Read 1 byte.
+    // Work done in Junit thread: Wrote and Read BYTES_BIG bytes.
+    // Work done in Junit's worker thread: Wrote and Read BYTES_SMALL bytes.
+    assertThreadStatisticsForThread(TEST_THREAD_ID, 1);
+    assertThreadStatisticsForThread(threadIdForTest, BYTES_BIG);
+    assertThreadStatisticsForThread(workerThreadID, BYTES_SMALL);
+
+  }
+
+  /**
+   * Assert bytes wrote by the current thread.
+   *
+   * @param out        OutputStream.
+   * @param writeBytes expected bytes.
+   * @return IOStatistics for this stream.
+   */
+  private IOStatistics assertThreadStatisticsBytesWrite(FSDataOutputStream out,
+      int writeBytes) {
+    S3ABlockOutputStream s3aOut = (S3ABlockOutputStream) 
out.getWrappedStream();
+    IOStatistics ioStatistics =
+        (IOStatisticsSnapshot) s3aOut.getThreadIOStatistics();
+    IOStatisticAssertions.assertThatStatisticCounter(ioStatistics,
+        StreamStatisticNames.STREAM_WRITE_BYTES)
+        .describedAs("Bytes wrote are not as expected")
+        .isEqualTo(writeBytes);
+
+    return ioStatistics;
+  }
+
+  /**
+   * Assert bytes read by the current thread.
+   *
+   * @param in        InputStream.
+   * @param readBytes expected bytes.
+   * @return IOStatistics for this stream.
+   */
+  private IOStatistics assertThreadStatisticsBytesRead(FSDataInputStream in,
+      int readBytes) {
+    S3AInputStream s3AInputStream =
+        (S3AInputStream) in.getWrappedStream();
+    IOStatistics ioStatistics = s3AInputStream.getThreadIOStatistics();
+    IOStatisticAssertions.assertThatStatisticCounter(ioStatistics,
+        StreamStatisticNames.STREAM_READ_BYTES)
+        .describedAs("Bytes read are not as expected")
+        .isEqualTo(readBytes);
+
+    return ioStatistics;
+  }
+
+  /**
+   * Assert fixed bytes wrote and read for a particular thread ID.
+   *
+   * @param testThreadId                thread ID.
+   * @param expectedBytesWrittenAndRead expected bytes.
+   */
+  private void assertThreadStatisticsForThread(long testThreadId,
+      int expectedBytesWrittenAndRead) {
+    LOG.info("Thread ID to be asserted: {}", testThreadId);
+    IOStatistics ioStatistics =
+        currentIOStatisticsContext().getThreadIOStatistics(testThreadId);
+
+    IOStatisticAssertions.assertThatStatisticCounter(ioStatistics,
+        StreamStatisticNames.STREAM_WRITE_BYTES)
+        .describedAs("Bytes wrote are not as expected for thread :{}",
+            testThreadId)
+        .isEqualTo(expectedBytesWrittenAndRead);
+
+    IOStatisticAssertions.assertThatStatisticCounter(ioStatistics,
+        StreamStatisticNames.STREAM_READ_BYTES)
+        .describedAs("Bytes read are not as expected for thread :{}",
+            testThreadId)
+        .isEqualTo(expectedBytesWrittenAndRead);
+  }
+
+  /**
+   * Simulating doing some work in a separate thread.
+   */
+  private class TestWorkerThread extends Thread implements Runnable {
+    @Override
+    public void run() {
+      S3AFileSystem fs = getFileSystem();
+      Path path = new Path("workerThread");

Review Comment:
   make path a field, pass in the methodpath in its constructor. i want to be 
able to run multiple hadoop-aws test runs in the same bucket ASAP.



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContext.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics.impl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.impl.WeakReferenceThreadMap;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT;
+
+/**
+ * Implementing the IOStatisticsContext interface.
+ *
+ * A Context defined for IOStatistics collection per thread which captures
+ * each worker thread's work in FS streams and stores it in the form of
+ * IOStatisticsAggregator which could be either IOStatisticsSnapshot or
+ * EmptyIOStatisticsStore if thread level aggregation is enabled or not for
+ * the FS. An active instance of the IOStatisticsContext can be used to
+ * collect the statistics.
+ *
+ * For the current thread the IOStatisticsSnapshot can be used as a way to
+ * move the IOStatistics data between applications using the Serializable
+ * nature of the class.
+ */
+public class IOStatisticsContext {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(IOStatisticsContext.class);
+  private static final boolean IS_THREAD_IOSTATS_ENABLED;
+
+  private static final WeakReferenceThreadMap<IOStatisticsContext>
+      ACTIVE_IOSTATS_CONTEXT = new WeakReferenceThreadMap<>(
+      IOStatisticsContext::createNewInstance,
+      IOStatisticsContext::referenceLostContext
+  );
+
+  /**
+   * Collecting IOStatistics per thread.
+   */
+  private final WeakReferenceThreadMap<IOStatisticsAggregator>
+      threadIOStatsContext = new WeakReferenceThreadMap<>(
+      this::getIOStatisticsAggregatorFactory,

Review Comment:
   same; indent the args to the constructor



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java:
##########
@@ -41,6 +41,8 @@
 import com.amazonaws.services.s3.model.UploadPartRequest;
 
 import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
+import org.apache.hadoop.classification.VisibleForTesting;

Review Comment:
   nit, should go above L43



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java:
##########
@@ -37,6 +37,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;

Review Comment:
   move down to the "real" hadoop imports are



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContext.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics.impl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.impl.WeakReferenceThreadMap;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT;
+
+/**
+ * Implementing the IOStatisticsContext interface.
+ *
+ * A Context defined for IOStatistics collection per thread which captures
+ * each worker thread's work in FS streams and stores it in the form of
+ * IOStatisticsAggregator which could be either IOStatisticsSnapshot or
+ * EmptyIOStatisticsStore if thread level aggregation is enabled or not for
+ * the FS. An active instance of the IOStatisticsContext can be used to
+ * collect the statistics.
+ *
+ * For the current thread the IOStatisticsSnapshot can be used as a way to
+ * move the IOStatistics data between applications using the Serializable
+ * nature of the class.
+ */
+public class IOStatisticsContext {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(IOStatisticsContext.class);
+  private static final boolean IS_THREAD_IOSTATS_ENABLED;
+
+  private static final WeakReferenceThreadMap<IOStatisticsContext>
+      ACTIVE_IOSTATS_CONTEXT = new WeakReferenceThreadMap<>(
+      IOStatisticsContext::createNewInstance,
+      IOStatisticsContext::referenceLostContext
+  );
+
+  /**
+   * Collecting IOStatistics per thread.
+   */
+  private final WeakReferenceThreadMap<IOStatisticsAggregator>
+      threadIOStatsContext = new WeakReferenceThreadMap<>(
+      this::getIOStatisticsAggregatorFactory,
+      this::referenceLost);
+
+  static {
+    // Work out if the current context have thread level IOStatistics enabled.
+    final Configuration configuration = new Configuration();
+    IS_THREAD_IOSTATS_ENABLED =
+        configuration.getBoolean(THREAD_LEVEL_IOSTATISTICS_ENABLED,
+            THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT);
+  }
+
+  /**
+   * Creating a new IOStatisticsContext instance for a FS to be used.
+   *
+   * @param key Thread ID that represents which thread the context belongs to.
+   * @return an instance of IOStatisticsContext.
+   */
+  private static IOStatisticsContext createNewInstance(Long key) {
+    return new IOStatisticsContext();
+  }
+
+  /**
+   * Get the current IOStatisticsContext.
+   *
+   * @return current IOStatisticsContext instance.
+   */
+  public static IOStatisticsContext currentIOStatisticsContext() {
+    return ACTIVE_IOSTATS_CONTEXT.get(Thread.currentThread().getId());
+  }
+
+  /**
+   * A Method to act as an IOStatisticsSnapshot factory, in a
+   * WeakReferenceThreadMap.
+   *
+   * @param key ThreadID.
+   * @return an Instance of IOStatisticsSnapshot.
+   */
+  private IOStatisticsAggregator getIOStatisticsAggregatorFactory(Long key) {
+    return getThreadIOStatistics();
+  }
+
+  /**
+   * In case of reference loss.
+   *
+   * @param key ThreadID.
+   */
+  private void referenceLost(Long key) {
+    LOG.debug("Reference lost for threadID: {}", key);
+  }
+
+  /**
+   * In case of reference loss for IOStatisticsContext.
+   *
+   * @param key ThreadID.
+   */
+  private static void referenceLostContext(Long key) {
+    LOG.debug("Reference lost for threadID for the context: {}", key);
+  }
+
+  /**
+   * A Method to get the IOStatisticsAggregator of the currentThread. This
+   * denotes the aggregated IOStatistics per thread.
+   *
+   * @return the instance of IOStatisticsAggregator for the current thread.
+   */
+  public IOStatisticsAggregator getThreadIOStatistics() {
+    // If Thread IOStats is disabled we return an emptyIOStatistics instance
+    // back.
+    if (!IS_THREAD_IOSTATS_ENABLED) {
+      return EmptyIOStatisticsStore.getInstance();
+    }
+
+    // If the current Thread ID already have an aggregator assigned, return
+    // that.
+    boolean isThreadIOStatsPresent =
+        threadIOStatsContext.containsKey(Thread.currentThread().getId());
+    if (isThreadIOStatsPresent) {
+      return threadIOStatsContext.getForCurrentThread();
+    }
+
+    // If no aggregator is defined to the thread ID, create one and assign it.
+    IOStatisticsSnapshot ioStatisticsSnapshot = new IOStatisticsSnapshot();
+    setThreadIOStatistics(ioStatisticsSnapshot);
+    return ioStatisticsSnapshot;
+  }
+
+  /**
+   * Set the IOStatisticsAggregator for the current context for a specific
+   * thread.
+   *
+   * @param ioStatisticsAggregator IOStatisticsAggregator instance for the
+   *                               current thread.
+   */
+  public void setThreadIOStatistics(
+      IOStatisticsAggregator ioStatisticsAggregator) {
+    threadIOStatsContext.setForCurrentThread(ioStatisticsAggregator);
+  }
+
+  /**
+   * Returns a snapshot of the current thread's IOStatistics.
+   *
+   * @return IOStatisticsSnapshot of the current thread.
+   */
+  public IOStatisticsSnapshot getThreadIOStatisticsSnapshot() {

Review Comment:
   how does this get used. as it is not a snapshot if another thread is 
actively updating the thread.
   
   if this is to be used to get a copy of the stats, we should call the method 
`snapshotCurrentThreadIOStatistics()` and always return a snapshot, either an 
empty one or one off the current thread's stats.



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java:
##########
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static 
org.apache.hadoop.fs.statistics.impl.IOStatisticsContext.currentIOStatisticsContext;
+
+/**
+ * Tests to verify the Thread-level IOStatistics.
+ */
+public class ITestS3AIOStatisticsContext extends AbstractS3ATestBase {
+
+  private static final int SMALL_THREADS = 2;
+  private static final int BYTES_BIG = 100;
+  private static final int BYTES_SMALL = 50;
+  private static final long TEST_THREAD_ID = Thread.currentThread().getId();
+
+  /**
+   * Run this before the tests once, to note down some work in the
+   * constructor thread to be verified later on in a test.
+   */
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    // Do some work in constructor thread.
+    S3AFileSystem fs = new S3AFileSystem();
+    Configuration conf = new Configuration();
+    fs.initialize(new URI(conf.get(TEST_FS_S3A_NAME)), conf);
+    Path path = new Path("testConstructor");
+    try (FSDataOutputStream out = fs.create(path)) {
+      out.write('a');
+    }
+    try (FSDataInputStream in = fs.open(path)) {
+      in.read();
+    }
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration configuration = super.createConfiguration();
+    removeBaseAndBucketOverrides(configuration,
+        THREAD_LEVEL_IOSTATISTICS_ENABLED);
+    return configuration;
+  }
+
+  /**
+   * Verify that S3AInputStream aggregates per thread IOStats collection
+   * correctly.
+   */
+  @Test
+  public void testS3AInputStreamIOStatisticsContext()
+      throws Exception {
+    S3AFileSystem fs = getFileSystem();
+    Path path = path(getMethodName());
+    byte[] data = dataset(256, 'a', 'z');
+    byte[] readDataFirst = new byte[BYTES_BIG];
+    byte[] readDataSecond = new byte[BYTES_SMALL];
+    writeDataset(fs, path, data, data.length, 1024, true);
+
+    final ExecutorService executor =
+        HadoopExecutors.newFixedThreadPool(SMALL_THREADS);
+    CountDownLatch latch = new CountDownLatch(SMALL_THREADS);
+
+    try {
+      for (int i = 0; i < SMALL_THREADS; i++) {
+        executor.submit(() -> {
+          try {
+            IOStatistics ioStatisticsFirst;
+            try (FSDataInputStream in = fs.open(path)) {
+              in.seek(50);
+              in.read(readDataFirst);
+              in.close();
+              ioStatisticsFirst = assertThreadStatisticsBytesRead(in,
+                  BYTES_BIG);
+            }
+            // Stream is closed for a thread. Re-open and do more operations.
+            IOStatistics ioStatisticsSecond;
+            try (FSDataInputStream in = fs.open(path)) {
+              in.seek(100);
+              in.read(readDataSecond);
+              in.close();
+              ioStatisticsSecond = assertThreadStatisticsBytesRead(in,
+                  BYTES_BIG + BYTES_SMALL);
+            }
+            latch.countDown();
+          } catch (Exception e) {
+            latch.countDown();
+            setFutureException(e);
+            LOG.error("An error occurred while doing a task in the thread", e);
+          } catch (AssertionError ase) {
+            latch.countDown();
+            setFutureAse(ase);
+            throw ase;
+          }
+        });
+      }
+      // wait for tasks to finish.
+      latch.await();
+    } finally {
+      executor.shutdown();
+    }
+
+    // Check if an Excp or ASE was caught while the test threads were running.
+    maybeReThrowFutureException();
+    maybeReThrowFutureASE();
+
+  }
+
+  /**
+   * Verify that S3ABlockOutputStream aggregates per thread IOStats collection
+   * correctly.
+   */
+  @Test
+  public void testS3ABlockOutputStreamIOStatisticsContext()
+      throws Exception {
+    S3AFileSystem fs = getFileSystem();
+    Path path = path(getMethodName());
+    byte[] writeDataFirst = new byte[BYTES_BIG];
+    byte[] writeDataSecond = new byte[BYTES_SMALL];
+
+    final ExecutorService executor =
+        HadoopExecutors.newFixedThreadPool(SMALL_THREADS);
+    CountDownLatch latch = new CountDownLatch(SMALL_THREADS);
+
+    try {
+      for (int i = 0; i < SMALL_THREADS; i++) {
+        executor.submit(() -> {
+          try {
+            IOStatistics ioStatisticsFirst;
+            try (FSDataOutputStream out = fs.create(path)) {
+              out.write(writeDataFirst);
+              out.close();
+              ioStatisticsFirst = assertThreadStatisticsBytesWrite(out,
+                  BYTES_BIG);
+            }
+            // Stream is closed for a thread. Re-open and do more operations.
+            IOStatistics ioStatisticsSecond;
+            try (FSDataOutputStream out = fs.create(path)) {
+              out.write(writeDataSecond);
+              out.close();
+              ioStatisticsSecond = assertThreadStatisticsBytesWrite(out,
+                  BYTES_BIG + BYTES_SMALL);
+            }
+            latch.countDown();
+          } catch (Exception e) {
+            latch.countDown();
+            setFutureException(e);
+            LOG.error("An error occurred while doing a task in the thread", e);
+          } catch (AssertionError ase) {
+            latch.countDown();
+            setFutureAse(ase);
+            throw ase;
+          }
+        });
+      }
+      // wait for tasks to finish.
+      latch.await();
+    } finally {
+      executor.shutdown();
+    }
+
+    // Check if an Excp or ASE was caught while the test threads were running.
+    maybeReThrowFutureException();
+    maybeReThrowFutureASE();
+
+  }
+
+  /**
+   * Verify stats collection and aggregation for constructor thread, Junit
+   * thread and a worker thread.
+   */
+  @Test
+  public void testThreadIOStatisticsForDifferentThreads()
+      throws IOException, InterruptedException {
+    S3AFileSystem fs = getFileSystem();
+    Path path = path(getMethodName());
+    byte[] data = new byte[BYTES_BIG];
+    long threadIdForTest = Thread.currentThread().getId();
+
+    // Write in the Junit thread.
+    try (FSDataOutputStream out = fs.create(path)) {
+      out.write(data);
+    }
+
+    // Read in the Junit thread.
+    try (FSDataInputStream in = fs.open(path)) {
+      in.read(data);
+    }
+
+    // Worker thread work and wait for it to finish.
+    TestWorkerThread workerThread = new TestWorkerThread();
+    long workerThreadID = workerThread.getId();
+    workerThread.start();
+    workerThread.join();
+
+    // Work done in constructor: Wrote and Read 1 byte.
+    // Work done in Junit thread: Wrote and Read BYTES_BIG bytes.
+    // Work done in Junit's worker thread: Wrote and Read BYTES_SMALL bytes.
+    assertThreadStatisticsForThread(TEST_THREAD_ID, 1);
+    assertThreadStatisticsForThread(threadIdForTest, BYTES_BIG);
+    assertThreadStatisticsForThread(workerThreadID, BYTES_SMALL);
+
+  }
+
+  /**
+   * Assert bytes wrote by the current thread.
+   *
+   * @param out        OutputStream.
+   * @param writeBytes expected bytes.
+   * @return IOStatistics for this stream.
+   */
+  private IOStatistics assertThreadStatisticsBytesWrite(FSDataOutputStream out,
+      int writeBytes) {
+    S3ABlockOutputStream s3aOut = (S3ABlockOutputStream) 
out.getWrappedStream();
+    IOStatistics ioStatistics =
+        (IOStatisticsSnapshot) s3aOut.getThreadIOStatistics();
+    IOStatisticAssertions.assertThatStatisticCounter(ioStatistics,
+        StreamStatisticNames.STREAM_WRITE_BYTES)
+        .describedAs("Bytes wrote are not as expected")
+        .isEqualTo(writeBytes);
+
+    return ioStatistics;
+  }
+
+  /**
+   * Assert bytes read by the current thread.
+   *
+   * @param in        InputStream.
+   * @param readBytes expected bytes.
+   * @return IOStatistics for this stream.
+   */
+  private IOStatistics assertThreadStatisticsBytesRead(FSDataInputStream in,
+      int readBytes) {
+    S3AInputStream s3AInputStream =
+        (S3AInputStream) in.getWrappedStream();
+    IOStatistics ioStatistics = s3AInputStream.getThreadIOStatistics();
+    IOStatisticAssertions.assertThatStatisticCounter(ioStatistics,
+        StreamStatisticNames.STREAM_READ_BYTES)
+        .describedAs("Bytes read are not as expected")
+        .isEqualTo(readBytes);
+
+    return ioStatistics;
+  }
+
+  /**
+   * Assert fixed bytes wrote and read for a particular thread ID.
+   *
+   * @param testThreadId                thread ID.
+   * @param expectedBytesWrittenAndRead expected bytes.
+   */
+  private void assertThreadStatisticsForThread(long testThreadId,
+      int expectedBytesWrittenAndRead) {
+    LOG.info("Thread ID to be asserted: {}", testThreadId);
+    IOStatistics ioStatistics =
+        currentIOStatisticsContext().getThreadIOStatistics(testThreadId);
+
+    IOStatisticAssertions.assertThatStatisticCounter(ioStatistics,
+        StreamStatisticNames.STREAM_WRITE_BYTES)
+        .describedAs("Bytes wrote are not as expected for thread :{}",

Review Comment:
   nit "written"



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -124,13 +124,15 @@
 import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
 import org.apache.hadoop.fs.s3a.impl.StoreContext;
 import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
+import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;

Review Comment:
   nit, move down to L133





Issue Time Tracking
-------------------

    Worklog Id:     (was: 785213)
    Time Spent: 2.5h  (was: 2h 20m)

> Add thread-level IOStatistics Context
> -------------------------------------
>
>                 Key: HADOOP-17461
>                 URL: https://issues.apache.org/jira/browse/HADOOP-17461
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs, fs/azure, fs/s3
>    Affects Versions: 3.3.1
>            Reporter: Steve Loughran
>            Assignee: Mehakmeet Singh
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> For effective reporting of the iostatistics of individual worker threads, we 
> need a thread-level context which IO components update.
> * this contact needs to be passed in two background thread forming work on 
> behalf of a task.
> * IO Components (streams, iterators, filesystems) need to update this context 
> statistics as they perform work
> * Without double counting anything.
> I imagine a ThreadLocal IOStatisticContext which will be updated in the 
> FileSystem API Calls. This context MUST be passed into the background threads 
> used by a task, so that IO is correctly aggregated.
> I don't want streams, listIterators &c to do the updating as there is more 
> risk of double counting. However, we need to see their statistics if we want 
> to know things like "bytes discarded in backwards seeks". And I don't want to 
> be updating a shared context object on every read() call.
> If all we want is store IO (HEAD, GET, DELETE, list performance etc) then the 
> FS is sufficient. 
> If we do want the stream-specific detail, then I propose
> * caching the context in the constructor
> * updating it only in close() or unbuffer() (as we do from S3AInputStream to 
> S3AInstrumenation)
> * excluding those we know the FS already collects.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to