[
https://issues.apache.org/jira/browse/HADOOP-17461?focusedWorklogId=785213&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785213
]
ASF GitHub Bot logged work on HADOOP-17461:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 27/Jun/22 19:04
Start Date: 27/Jun/22 19:04
Worklog Time Spent: 10m
Work Description: steveloughran commented on code in PR #4352:
URL: https://github.com/apache/hadoop/pull/4352#discussion_r907678976
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContext.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics.impl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.impl.WeakReferenceThreadMap;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+
+import static
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED;
+import static
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT;
+
+/**
+ * Implementing the IOStatisticsContext interface.
Review Comment:
javadocs here don't match with latest code.
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContext.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics.impl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.impl.WeakReferenceThreadMap;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+
+import static
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED;
+import static
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT;
+
+/**
+ * Implementing the IOStatisticsContext interface.
+ *
+ * A Context defined for IOStatistics collection per thread which captures
+ * each worker thread's work in FS streams and stores it in the form of
+ * IOStatisticsAggregator which could be either IOStatisticsSnapshot or
+ * EmptyIOStatisticsStore if thread level aggregation is enabled or not for
+ * the FS. An active instance of the IOStatisticsContext can be used to
+ * collect the statistics.
+ *
+ * For the current thread the IOStatisticsSnapshot can be used as a way to
+ * move the IOStatistics data between applications using the Serializable
+ * nature of the class.
+ */
+public class IOStatisticsContext {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(IOStatisticsContext.class);
+ private static final boolean IS_THREAD_IOSTATS_ENABLED;
+
+ private static final WeakReferenceThreadMap<IOStatisticsContext>
+ ACTIVE_IOSTATS_CONTEXT = new WeakReferenceThreadMap<>(
+ IOStatisticsContext::createNewInstance,
Review Comment:
nit, indent this and L56
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContext.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics.impl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.impl.WeakReferenceThreadMap;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+
+import static
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED;
+import static
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT;
+
+/**
+ * Implementing the IOStatisticsContext interface.
+ *
+ * A Context defined for IOStatistics collection per thread which captures
+ * each worker thread's work in FS streams and stores it in the form of
+ * IOStatisticsAggregator which could be either IOStatisticsSnapshot or
+ * EmptyIOStatisticsStore if thread level aggregation is enabled or not for
+ * the FS. An active instance of the IOStatisticsContext can be used to
+ * collect the statistics.
+ *
+ * For the current thread the IOStatisticsSnapshot can be used as a way to
+ * move the IOStatistics data between applications using the Serializable
+ * nature of the class.
+ */
+public class IOStatisticsContext {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(IOStatisticsContext.class);
+ private static final boolean IS_THREAD_IOSTATS_ENABLED;
+
+ private static final WeakReferenceThreadMap<IOStatisticsContext>
+ ACTIVE_IOSTATS_CONTEXT = new WeakReferenceThreadMap<>(
+ IOStatisticsContext::createNewInstance,
+ IOStatisticsContext::referenceLostContext
+ );
+
+ /**
+ * Collecting IOStatistics per thread.
+ */
+ private final WeakReferenceThreadMap<IOStatisticsAggregator>
+ threadIOStatsContext = new WeakReferenceThreadMap<>(
+ this::getIOStatisticsAggregatorFactory,
+ this::referenceLost);
+
+ static {
+ // Work out if the current context have thread level IOStatistics enabled.
Review Comment:
nit "has"
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContext.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics.impl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.impl.WeakReferenceThreadMap;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+
+import static
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED;
+import static
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT;
+
+/**
+ * Implementing the IOStatisticsContext interface.
+ *
+ * A Context defined for IOStatistics collection per thread which captures
+ * each worker thread's work in FS streams and stores it in the form of
+ * IOStatisticsAggregator which could be either IOStatisticsSnapshot or
+ * EmptyIOStatisticsStore if thread level aggregation is enabled or not for
+ * the FS. An active instance of the IOStatisticsContext can be used to
+ * collect the statistics.
+ *
+ * For the current thread the IOStatisticsSnapshot can be used as a way to
+ * move the IOStatistics data between applications using the Serializable
+ * nature of the class.
+ */
+public class IOStatisticsContext {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(IOStatisticsContext.class);
+ private static final boolean IS_THREAD_IOSTATS_ENABLED;
+
+ private static final WeakReferenceThreadMap<IOStatisticsContext>
+ ACTIVE_IOSTATS_CONTEXT = new WeakReferenceThreadMap<>(
+ IOStatisticsContext::createNewInstance,
+ IOStatisticsContext::referenceLostContext
+ );
+
+ /**
+ * Collecting IOStatistics per thread.
+ */
+ private final WeakReferenceThreadMap<IOStatisticsAggregator>
+ threadIOStatsContext = new WeakReferenceThreadMap<>(
+ this::getIOStatisticsAggregatorFactory,
+ this::referenceLost);
+
+ static {
+ // Work out if the current context have thread level IOStatistics enabled.
+ final Configuration configuration = new Configuration();
+ IS_THREAD_IOSTATS_ENABLED =
+ configuration.getBoolean(THREAD_LEVEL_IOSTATISTICS_ENABLED,
+ THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT);
+ }
+
+ /**
+ * Creating a new IOStatisticsContext instance for a FS to be used.
+ *
+ * @param key Thread ID that represents which thread the context belongs to.
+ * @return an instance of IOStatisticsContext.
+ */
+ private static IOStatisticsContext createNewInstance(Long key) {
+ return new IOStatisticsContext();
+ }
+
+ /**
+ * Get the current IOStatisticsContext.
+ *
+ * @return current IOStatisticsContext instance.
+ */
+ public static IOStatisticsContext currentIOStatisticsContext() {
+ return ACTIVE_IOSTATS_CONTEXT.get(Thread.currentThread().getId());
+ }
+
+ /**
+ * A Method to act as an IOStatisticsSnapshot factory, in a
+ * WeakReferenceThreadMap.
+ *
+ * @param key ThreadID.
+ * @return an Instance of IOStatisticsSnapshot.
+ */
+ private IOStatisticsAggregator getIOStatisticsAggregatorFactory(Long key) {
+ return getThreadIOStatistics();
+ }
+
+ /**
+ * In case of reference loss.
+ *
+ * @param key ThreadID.
+ */
+ private void referenceLost(Long key) {
+ LOG.debug("Reference lost for threadID: {}", key);
+ }
+
+ /**
+ * In case of reference loss for IOStatisticsContext.
+ *
+ * @param key ThreadID.
+ */
+ private static void referenceLostContext(Long key) {
+ LOG.debug("Reference lost for threadID for the context: {}", key);
+ }
+
+ /**
+ * A Method to get the IOStatisticsAggregator of the currentThread. This
+ * denotes the aggregated IOStatistics per thread.
+ *
+ * @return the instance of IOStatisticsAggregator for the current thread.
+ */
+ public IOStatisticsAggregator getThreadIOStatistics() {
+ // If Thread IOStats is disabled we return an emptyIOStatistics instance
+ // back.
+ if (!IS_THREAD_IOSTATS_ENABLED) {
+ return EmptyIOStatisticsStore.getInstance();
+ }
+
+ // If the current Thread ID already have an aggregator assigned, return
+ // that.
+ boolean isThreadIOStatsPresent =
+ threadIOStatsContext.containsKey(Thread.currentThread().getId());
+ if (isThreadIOStatsPresent) {
+ return threadIOStatsContext.getForCurrentThread();
+ }
+
+ // If no aggregator is defined to the thread ID, create one and assign it.
+ IOStatisticsSnapshot ioStatisticsSnapshot = new IOStatisticsSnapshot();
+ setThreadIOStatistics(ioStatisticsSnapshot);
+ return ioStatisticsSnapshot;
+ }
+
+ /**
+ * Set the IOStatisticsAggregator for the current context for a specific
+ * thread.
+ *
+ * @param ioStatisticsAggregator IOStatisticsAggregator instance for the
+ * current thread.
+ */
+ public void setThreadIOStatistics(
+ IOStatisticsAggregator ioStatisticsAggregator) {
+ threadIOStatsContext.setForCurrentThread(ioStatisticsAggregator);
+ }
+
+ /**
+ * Returns a snapshot of the current thread's IOStatistics.
+ *
+ * @return IOStatisticsSnapshot of the current thread.
+ */
+ public IOStatisticsSnapshot getThreadIOStatisticsSnapshot() {
+ if (IS_THREAD_IOSTATS_ENABLED) {
+ return (IOStatisticsSnapshot) getThreadIOStatistics();
+ }
+ return new IOStatisticsSnapshot();
+ }
+
+ /**
+ * Get thread ID specific IOStatistics values.
+ *
+ * @param testThreadId thread ID.
+ * @return IOStatistics instance.
+ */
+ @VisibleForTesting
+ public IOStatistics getThreadIOStatistics(long testThreadId) {
+ LOG.info("IOStatsContext thread ID required: {}", testThreadId);
Review Comment:
info? or debug?
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java:
##########
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+
+import static
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static
org.apache.hadoop.fs.statistics.impl.IOStatisticsContext.currentIOStatisticsContext;
+
+/**
+ * Tests to verify the Thread-level IOStatistics.
+ */
+public class ITestS3AIOStatisticsContext extends AbstractS3ATestBase {
+
+ private static final int SMALL_THREADS = 2;
+ private static final int BYTES_BIG = 100;
+ private static final int BYTES_SMALL = 50;
+ private static final long TEST_THREAD_ID = Thread.currentThread().getId();
+
+ /**
+ * Run this before the tests once, to note down some work in the
+ * constructor thread to be verified later on in a test.
+ */
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ // Do some work in constructor thread.
+ S3AFileSystem fs = new S3AFileSystem();
+ Configuration conf = new Configuration();
+ fs.initialize(new URI(conf.get(TEST_FS_S3A_NAME)), conf);
+ Path path = new Path("testConstructor");
+ try (FSDataOutputStream out = fs.create(path)) {
+ out.write('a');
+ }
+ try (FSDataInputStream in = fs.open(path)) {
+ in.read();
+ }
+ }
+
+ @Override
+ protected Configuration createConfiguration() {
+ Configuration configuration = super.createConfiguration();
+ removeBaseAndBucketOverrides(configuration,
+ THREAD_LEVEL_IOSTATISTICS_ENABLED);
+ return configuration;
+ }
+
+ /**
+ * Verify that S3AInputStream aggregates per thread IOStats collection
+ * correctly.
+ */
+ @Test
+ public void testS3AInputStreamIOStatisticsContext()
+ throws Exception {
+ S3AFileSystem fs = getFileSystem();
+ Path path = path(getMethodName());
+ byte[] data = dataset(256, 'a', 'z');
+ byte[] readDataFirst = new byte[BYTES_BIG];
+ byte[] readDataSecond = new byte[BYTES_SMALL];
+ writeDataset(fs, path, data, data.length, 1024, true);
+
+ final ExecutorService executor =
+ HadoopExecutors.newFixedThreadPool(SMALL_THREADS);
+ CountDownLatch latch = new CountDownLatch(SMALL_THREADS);
+
+ try {
+ for (int i = 0; i < SMALL_THREADS; i++) {
+ executor.submit(() -> {
+ try {
+ IOStatistics ioStatisticsFirst;
+ try (FSDataInputStream in = fs.open(path)) {
+ in.seek(50);
+ in.read(readDataFirst);
+ in.close();
+ ioStatisticsFirst = assertThreadStatisticsBytesRead(in,
+ BYTES_BIG);
+ }
+ // Stream is closed for a thread. Re-open and do more operations.
+ IOStatistics ioStatisticsSecond;
+ try (FSDataInputStream in = fs.open(path)) {
+ in.seek(100);
+ in.read(readDataSecond);
+ in.close();
+ ioStatisticsSecond = assertThreadStatisticsBytesRead(in,
+ BYTES_BIG + BYTES_SMALL);
+ }
+ latch.countDown();
+ } catch (Exception e) {
+ latch.countDown();
+ setFutureException(e);
+ LOG.error("An error occurred while doing a task in the thread", e);
+ } catch (AssertionError ase) {
+ latch.countDown();
+ setFutureAse(ase);
+ throw ase;
+ }
+ });
+ }
+ // wait for tasks to finish.
+ latch.await();
+ } finally {
+ executor.shutdown();
+ }
+
+ // Check if an Excp or ASE was caught while the test threads were running.
+ maybeReThrowFutureException();
+ maybeReThrowFutureASE();
+
+ }
+
+ /**
+ * Verify that S3ABlockOutputStream aggregates per thread IOStats collection
+ * correctly.
+ */
+ @Test
+ public void testS3ABlockOutputStreamIOStatisticsContext()
+ throws Exception {
+ S3AFileSystem fs = getFileSystem();
+ Path path = path(getMethodName());
+ byte[] writeDataFirst = new byte[BYTES_BIG];
+ byte[] writeDataSecond = new byte[BYTES_SMALL];
+
+ final ExecutorService executor =
+ HadoopExecutors.newFixedThreadPool(SMALL_THREADS);
+ CountDownLatch latch = new CountDownLatch(SMALL_THREADS);
+
+ try {
+ for (int i = 0; i < SMALL_THREADS; i++) {
+ executor.submit(() -> {
+ try {
+ IOStatistics ioStatisticsFirst;
+ try (FSDataOutputStream out = fs.create(path)) {
+ out.write(writeDataFirst);
+ out.close();
+ ioStatisticsFirst = assertThreadStatisticsBytesWrite(out,
+ BYTES_BIG);
+ }
+ // Stream is closed for a thread. Re-open and do more operations.
+ IOStatistics ioStatisticsSecond;
+ try (FSDataOutputStream out = fs.create(path)) {
+ out.write(writeDataSecond);
+ out.close();
+ ioStatisticsSecond = assertThreadStatisticsBytesWrite(out,
+ BYTES_BIG + BYTES_SMALL);
+ }
+ latch.countDown();
+ } catch (Exception e) {
+ latch.countDown();
+ setFutureException(e);
+ LOG.error("An error occurred while doing a task in the thread", e);
+ } catch (AssertionError ase) {
+ latch.countDown();
+ setFutureAse(ase);
+ throw ase;
+ }
+ });
+ }
+ // wait for tasks to finish.
+ latch.await();
+ } finally {
+ executor.shutdown();
+ }
+
+ // Check if an Excp or ASE was caught while the test threads were running.
+ maybeReThrowFutureException();
+ maybeReThrowFutureASE();
+
+ }
+
+ /**
+ * Verify stats collection and aggregation for constructor thread, Junit
+ * thread and a worker thread.
+ */
+ @Test
+ public void testThreadIOStatisticsForDifferentThreads()
+ throws IOException, InterruptedException {
+ S3AFileSystem fs = getFileSystem();
+ Path path = path(getMethodName());
+ byte[] data = new byte[BYTES_BIG];
+ long threadIdForTest = Thread.currentThread().getId();
+
+ // Write in the Junit thread.
+ try (FSDataOutputStream out = fs.create(path)) {
+ out.write(data);
+ }
+
+ // Read in the Junit thread.
+ try (FSDataInputStream in = fs.open(path)) {
+ in.read(data);
+ }
+
+ // Worker thread work and wait for it to finish.
+ TestWorkerThread workerThread = new TestWorkerThread();
+ long workerThreadID = workerThread.getId();
+ workerThread.start();
+ workerThread.join();
+
+ // Work done in constructor: Wrote and Read 1 byte.
+ // Work done in Junit thread: Wrote and Read BYTES_BIG bytes.
+ // Work done in Junit's worker thread: Wrote and Read BYTES_SMALL bytes.
+ assertThreadStatisticsForThread(TEST_THREAD_ID, 1);
+ assertThreadStatisticsForThread(threadIdForTest, BYTES_BIG);
+ assertThreadStatisticsForThread(workerThreadID, BYTES_SMALL);
+
+ }
+
+ /**
+ * Assert bytes wrote by the current thread.
+ *
+ * @param out OutputStream.
+ * @param writeBytes expected bytes.
+ * @return IOStatistics for this stream.
+ */
+ private IOStatistics assertThreadStatisticsBytesWrite(FSDataOutputStream out,
+ int writeBytes) {
+ S3ABlockOutputStream s3aOut = (S3ABlockOutputStream)
out.getWrappedStream();
+ IOStatistics ioStatistics =
+ (IOStatisticsSnapshot) s3aOut.getThreadIOStatistics();
+ IOStatisticAssertions.assertThatStatisticCounter(ioStatistics,
+ StreamStatisticNames.STREAM_WRITE_BYTES)
+ .describedAs("Bytes wrote are not as expected")
+ .isEqualTo(writeBytes);
+
+ return ioStatistics;
+ }
+
+ /**
+ * Assert bytes read by the current thread.
+ *
+ * @param in InputStream.
+ * @param readBytes expected bytes.
+ * @return IOStatistics for this stream.
+ */
+ private IOStatistics assertThreadStatisticsBytesRead(FSDataInputStream in,
+ int readBytes) {
+ S3AInputStream s3AInputStream =
+ (S3AInputStream) in.getWrappedStream();
+ IOStatistics ioStatistics = s3AInputStream.getThreadIOStatistics();
+ IOStatisticAssertions.assertThatStatisticCounter(ioStatistics,
+ StreamStatisticNames.STREAM_READ_BYTES)
+ .describedAs("Bytes read are not as expected")
+ .isEqualTo(readBytes);
+
+ return ioStatistics;
+ }
+
+ /**
+ * Assert fixed bytes wrote and read for a particular thread ID.
+ *
+ * @param testThreadId thread ID.
+ * @param expectedBytesWrittenAndRead expected bytes.
+ */
+ private void assertThreadStatisticsForThread(long testThreadId,
+ int expectedBytesWrittenAndRead) {
+ LOG.info("Thread ID to be asserted: {}", testThreadId);
+ IOStatistics ioStatistics =
+ currentIOStatisticsContext().getThreadIOStatistics(testThreadId);
+
+ IOStatisticAssertions.assertThatStatisticCounter(ioStatistics,
+ StreamStatisticNames.STREAM_WRITE_BYTES)
+ .describedAs("Bytes wrote are not as expected for thread :{}",
+ testThreadId)
+ .isEqualTo(expectedBytesWrittenAndRead);
+
+ IOStatisticAssertions.assertThatStatisticCounter(ioStatistics,
+ StreamStatisticNames.STREAM_READ_BYTES)
+ .describedAs("Bytes read are not as expected for thread :{}",
+ testThreadId)
+ .isEqualTo(expectedBytesWrittenAndRead);
+ }
+
+ /**
+ * Simulating doing some work in a separate thread.
+ */
+ private class TestWorkerThread extends Thread implements Runnable {
+ @Override
+ public void run() {
+ S3AFileSystem fs = getFileSystem();
+ Path path = new Path("workerThread");
Review Comment:
make path a field, pass in the methodpath in its constructor. i want to be
able to run multiple hadoop-aws test runs in the same bucket ASAP.
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContext.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics.impl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.impl.WeakReferenceThreadMap;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+
+import static
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED;
+import static
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT;
+
+/**
+ * Implementing the IOStatisticsContext interface.
+ *
+ * A Context defined for IOStatistics collection per thread which captures
+ * each worker thread's work in FS streams and stores it in the form of
+ * IOStatisticsAggregator which could be either IOStatisticsSnapshot or
+ * EmptyIOStatisticsStore if thread level aggregation is enabled or not for
+ * the FS. An active instance of the IOStatisticsContext can be used to
+ * collect the statistics.
+ *
+ * For the current thread the IOStatisticsSnapshot can be used as a way to
+ * move the IOStatistics data between applications using the Serializable
+ * nature of the class.
+ */
+public class IOStatisticsContext {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(IOStatisticsContext.class);
+ private static final boolean IS_THREAD_IOSTATS_ENABLED;
+
+ private static final WeakReferenceThreadMap<IOStatisticsContext>
+ ACTIVE_IOSTATS_CONTEXT = new WeakReferenceThreadMap<>(
+ IOStatisticsContext::createNewInstance,
+ IOStatisticsContext::referenceLostContext
+ );
+
+ /**
+ * Collecting IOStatistics per thread.
+ */
+ private final WeakReferenceThreadMap<IOStatisticsAggregator>
+ threadIOStatsContext = new WeakReferenceThreadMap<>(
+ this::getIOStatisticsAggregatorFactory,
Review Comment:
same; indent the args to the constructor
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java:
##########
@@ -41,6 +41,8 @@
import com.amazonaws.services.s3.model.UploadPartRequest;
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
+import org.apache.hadoop.classification.VisibleForTesting;
Review Comment:
nit, should go above L43
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java:
##########
@@ -37,6 +37,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
Review Comment:
move down to the "real" hadoop imports are
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContext.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics.impl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.impl.WeakReferenceThreadMap;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+
+import static
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED;
+import static
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT;
+
+/**
+ * Implementing the IOStatisticsContext interface.
+ *
+ * A Context defined for IOStatistics collection per thread which captures
+ * each worker thread's work in FS streams and stores it in the form of
+ * IOStatisticsAggregator which could be either IOStatisticsSnapshot or
+ * EmptyIOStatisticsStore if thread level aggregation is enabled or not for
+ * the FS. An active instance of the IOStatisticsContext can be used to
+ * collect the statistics.
+ *
+ * For the current thread the IOStatisticsSnapshot can be used as a way to
+ * move the IOStatistics data between applications using the Serializable
+ * nature of the class.
+ */
+public class IOStatisticsContext {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(IOStatisticsContext.class);
+ private static final boolean IS_THREAD_IOSTATS_ENABLED;
+
+ private static final WeakReferenceThreadMap<IOStatisticsContext>
+ ACTIVE_IOSTATS_CONTEXT = new WeakReferenceThreadMap<>(
+ IOStatisticsContext::createNewInstance,
+ IOStatisticsContext::referenceLostContext
+ );
+
+ /**
+ * Collecting IOStatistics per thread.
+ */
+ private final WeakReferenceThreadMap<IOStatisticsAggregator>
+ threadIOStatsContext = new WeakReferenceThreadMap<>(
+ this::getIOStatisticsAggregatorFactory,
+ this::referenceLost);
+
+ static {
+ // Work out if the current context have thread level IOStatistics enabled.
+ final Configuration configuration = new Configuration();
+ IS_THREAD_IOSTATS_ENABLED =
+ configuration.getBoolean(THREAD_LEVEL_IOSTATISTICS_ENABLED,
+ THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT);
+ }
+
+ /**
+ * Creating a new IOStatisticsContext instance for a FS to be used.
+ *
+ * @param key Thread ID that represents which thread the context belongs to.
+ * @return an instance of IOStatisticsContext.
+ */
+ private static IOStatisticsContext createNewInstance(Long key) {
+ return new IOStatisticsContext();
+ }
+
+ /**
+ * Get the current IOStatisticsContext.
+ *
+ * @return current IOStatisticsContext instance.
+ */
+ public static IOStatisticsContext currentIOStatisticsContext() {
+ return ACTIVE_IOSTATS_CONTEXT.get(Thread.currentThread().getId());
+ }
+
+ /**
+ * A Method to act as an IOStatisticsSnapshot factory, in a
+ * WeakReferenceThreadMap.
+ *
+ * @param key ThreadID.
+ * @return an Instance of IOStatisticsSnapshot.
+ */
+ private IOStatisticsAggregator getIOStatisticsAggregatorFactory(Long key) {
+ return getThreadIOStatistics();
+ }
+
+ /**
+ * In case of reference loss.
+ *
+ * @param key ThreadID.
+ */
+ private void referenceLost(Long key) {
+ LOG.debug("Reference lost for threadID: {}", key);
+ }
+
+ /**
+ * In case of reference loss for IOStatisticsContext.
+ *
+ * @param key ThreadID.
+ */
+ private static void referenceLostContext(Long key) {
+ LOG.debug("Reference lost for threadID for the context: {}", key);
+ }
+
+ /**
+ * A Method to get the IOStatisticsAggregator of the currentThread. This
+ * denotes the aggregated IOStatistics per thread.
+ *
+ * @return the instance of IOStatisticsAggregator for the current thread.
+ */
+ public IOStatisticsAggregator getThreadIOStatistics() {
+ // If Thread IOStats is disabled we return an emptyIOStatistics instance
+ // back.
+ if (!IS_THREAD_IOSTATS_ENABLED) {
+ return EmptyIOStatisticsStore.getInstance();
+ }
+
+ // If the current Thread ID already have an aggregator assigned, return
+ // that.
+ boolean isThreadIOStatsPresent =
+ threadIOStatsContext.containsKey(Thread.currentThread().getId());
+ if (isThreadIOStatsPresent) {
+ return threadIOStatsContext.getForCurrentThread();
+ }
+
+ // If no aggregator is defined to the thread ID, create one and assign it.
+ IOStatisticsSnapshot ioStatisticsSnapshot = new IOStatisticsSnapshot();
+ setThreadIOStatistics(ioStatisticsSnapshot);
+ return ioStatisticsSnapshot;
+ }
+
+ /**
+ * Set the IOStatisticsAggregator for the current context for a specific
+ * thread.
+ *
+ * @param ioStatisticsAggregator IOStatisticsAggregator instance for the
+ * current thread.
+ */
+ public void setThreadIOStatistics(
+ IOStatisticsAggregator ioStatisticsAggregator) {
+ threadIOStatsContext.setForCurrentThread(ioStatisticsAggregator);
+ }
+
+ /**
+ * Returns a snapshot of the current thread's IOStatistics.
+ *
+ * @return IOStatisticsSnapshot of the current thread.
+ */
+ public IOStatisticsSnapshot getThreadIOStatisticsSnapshot() {
Review Comment:
how does this get used. as it is not a snapshot if another thread is
actively updating the thread.
if this is to be used to get a copy of the stats, we should call the method
`snapshotCurrentThreadIOStatistics()` and always return a snapshot, either an
empty one or one off the current thread's stats.
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java:
##########
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+
+import static
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static
org.apache.hadoop.fs.statistics.impl.IOStatisticsContext.currentIOStatisticsContext;
+
+/**
+ * Tests to verify the Thread-level IOStatistics.
+ */
+public class ITestS3AIOStatisticsContext extends AbstractS3ATestBase {
+
+ private static final int SMALL_THREADS = 2;
+ private static final int BYTES_BIG = 100;
+ private static final int BYTES_SMALL = 50;
+ private static final long TEST_THREAD_ID = Thread.currentThread().getId();
+
+ /**
+ * Run this before the tests once, to note down some work in the
+ * constructor thread to be verified later on in a test.
+ */
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ // Do some work in constructor thread.
+ S3AFileSystem fs = new S3AFileSystem();
+ Configuration conf = new Configuration();
+ fs.initialize(new URI(conf.get(TEST_FS_S3A_NAME)), conf);
+ Path path = new Path("testConstructor");
+ try (FSDataOutputStream out = fs.create(path)) {
+ out.write('a');
+ }
+ try (FSDataInputStream in = fs.open(path)) {
+ in.read();
+ }
+ }
+
+ @Override
+ protected Configuration createConfiguration() {
+ Configuration configuration = super.createConfiguration();
+ removeBaseAndBucketOverrides(configuration,
+ THREAD_LEVEL_IOSTATISTICS_ENABLED);
+ return configuration;
+ }
+
+ /**
+ * Verify that S3AInputStream aggregates per thread IOStats collection
+ * correctly.
+ */
+ @Test
+ public void testS3AInputStreamIOStatisticsContext()
+ throws Exception {
+ S3AFileSystem fs = getFileSystem();
+ Path path = path(getMethodName());
+ byte[] data = dataset(256, 'a', 'z');
+ byte[] readDataFirst = new byte[BYTES_BIG];
+ byte[] readDataSecond = new byte[BYTES_SMALL];
+ writeDataset(fs, path, data, data.length, 1024, true);
+
+ final ExecutorService executor =
+ HadoopExecutors.newFixedThreadPool(SMALL_THREADS);
+ CountDownLatch latch = new CountDownLatch(SMALL_THREADS);
+
+ try {
+ for (int i = 0; i < SMALL_THREADS; i++) {
+ executor.submit(() -> {
+ try {
+ IOStatistics ioStatisticsFirst;
+ try (FSDataInputStream in = fs.open(path)) {
+ in.seek(50);
+ in.read(readDataFirst);
+ in.close();
+ ioStatisticsFirst = assertThreadStatisticsBytesRead(in,
+ BYTES_BIG);
+ }
+ // Stream is closed for a thread. Re-open and do more operations.
+ IOStatistics ioStatisticsSecond;
+ try (FSDataInputStream in = fs.open(path)) {
+ in.seek(100);
+ in.read(readDataSecond);
+ in.close();
+ ioStatisticsSecond = assertThreadStatisticsBytesRead(in,
+ BYTES_BIG + BYTES_SMALL);
+ }
+ latch.countDown();
+ } catch (Exception e) {
+ latch.countDown();
+ setFutureException(e);
+ LOG.error("An error occurred while doing a task in the thread", e);
+ } catch (AssertionError ase) {
+ latch.countDown();
+ setFutureAse(ase);
+ throw ase;
+ }
+ });
+ }
+ // wait for tasks to finish.
+ latch.await();
+ } finally {
+ executor.shutdown();
+ }
+
+ // Check if an Excp or ASE was caught while the test threads were running.
+ maybeReThrowFutureException();
+ maybeReThrowFutureASE();
+
+ }
+
+ /**
+ * Verify that S3ABlockOutputStream aggregates per thread IOStats collection
+ * correctly.
+ */
+ @Test
+ public void testS3ABlockOutputStreamIOStatisticsContext()
+ throws Exception {
+ S3AFileSystem fs = getFileSystem();
+ Path path = path(getMethodName());
+ byte[] writeDataFirst = new byte[BYTES_BIG];
+ byte[] writeDataSecond = new byte[BYTES_SMALL];
+
+ final ExecutorService executor =
+ HadoopExecutors.newFixedThreadPool(SMALL_THREADS);
+ CountDownLatch latch = new CountDownLatch(SMALL_THREADS);
+
+ try {
+ for (int i = 0; i < SMALL_THREADS; i++) {
+ executor.submit(() -> {
+ try {
+ IOStatistics ioStatisticsFirst;
+ try (FSDataOutputStream out = fs.create(path)) {
+ out.write(writeDataFirst);
+ out.close();
+ ioStatisticsFirst = assertThreadStatisticsBytesWrite(out,
+ BYTES_BIG);
+ }
+ // Stream is closed for a thread. Re-open and do more operations.
+ IOStatistics ioStatisticsSecond;
+ try (FSDataOutputStream out = fs.create(path)) {
+ out.write(writeDataSecond);
+ out.close();
+ ioStatisticsSecond = assertThreadStatisticsBytesWrite(out,
+ BYTES_BIG + BYTES_SMALL);
+ }
+ latch.countDown();
+ } catch (Exception e) {
+ latch.countDown();
+ setFutureException(e);
+ LOG.error("An error occurred while doing a task in the thread", e);
+ } catch (AssertionError ase) {
+ latch.countDown();
+ setFutureAse(ase);
+ throw ase;
+ }
+ });
+ }
+ // wait for tasks to finish.
+ latch.await();
+ } finally {
+ executor.shutdown();
+ }
+
+ // Check if an Excp or ASE was caught while the test threads were running.
+ maybeReThrowFutureException();
+ maybeReThrowFutureASE();
+
+ }
+
+ /**
+ * Verify stats collection and aggregation for constructor thread, Junit
+ * thread and a worker thread.
+ */
+ @Test
+ public void testThreadIOStatisticsForDifferentThreads()
+ throws IOException, InterruptedException {
+ S3AFileSystem fs = getFileSystem();
+ Path path = path(getMethodName());
+ byte[] data = new byte[BYTES_BIG];
+ long threadIdForTest = Thread.currentThread().getId();
+
+ // Write in the Junit thread.
+ try (FSDataOutputStream out = fs.create(path)) {
+ out.write(data);
+ }
+
+ // Read in the Junit thread.
+ try (FSDataInputStream in = fs.open(path)) {
+ in.read(data);
+ }
+
+ // Worker thread work and wait for it to finish.
+ TestWorkerThread workerThread = new TestWorkerThread();
+ long workerThreadID = workerThread.getId();
+ workerThread.start();
+ workerThread.join();
+
+ // Work done in constructor: Wrote and Read 1 byte.
+ // Work done in Junit thread: Wrote and Read BYTES_BIG bytes.
+ // Work done in Junit's worker thread: Wrote and Read BYTES_SMALL bytes.
+ assertThreadStatisticsForThread(TEST_THREAD_ID, 1);
+ assertThreadStatisticsForThread(threadIdForTest, BYTES_BIG);
+ assertThreadStatisticsForThread(workerThreadID, BYTES_SMALL);
+
+ }
+
+ /**
+ * Assert bytes wrote by the current thread.
+ *
+ * @param out OutputStream.
+ * @param writeBytes expected bytes.
+ * @return IOStatistics for this stream.
+ */
+ private IOStatistics assertThreadStatisticsBytesWrite(FSDataOutputStream out,
+ int writeBytes) {
+ S3ABlockOutputStream s3aOut = (S3ABlockOutputStream)
out.getWrappedStream();
+ IOStatistics ioStatistics =
+ (IOStatisticsSnapshot) s3aOut.getThreadIOStatistics();
+ IOStatisticAssertions.assertThatStatisticCounter(ioStatistics,
+ StreamStatisticNames.STREAM_WRITE_BYTES)
+ .describedAs("Bytes wrote are not as expected")
+ .isEqualTo(writeBytes);
+
+ return ioStatistics;
+ }
+
+ /**
+ * Assert bytes read by the current thread.
+ *
+ * @param in InputStream.
+ * @param readBytes expected bytes.
+ * @return IOStatistics for this stream.
+ */
+ private IOStatistics assertThreadStatisticsBytesRead(FSDataInputStream in,
+ int readBytes) {
+ S3AInputStream s3AInputStream =
+ (S3AInputStream) in.getWrappedStream();
+ IOStatistics ioStatistics = s3AInputStream.getThreadIOStatistics();
+ IOStatisticAssertions.assertThatStatisticCounter(ioStatistics,
+ StreamStatisticNames.STREAM_READ_BYTES)
+ .describedAs("Bytes read are not as expected")
+ .isEqualTo(readBytes);
+
+ return ioStatistics;
+ }
+
+ /**
+ * Assert fixed bytes wrote and read for a particular thread ID.
+ *
+ * @param testThreadId thread ID.
+ * @param expectedBytesWrittenAndRead expected bytes.
+ */
+ private void assertThreadStatisticsForThread(long testThreadId,
+ int expectedBytesWrittenAndRead) {
+ LOG.info("Thread ID to be asserted: {}", testThreadId);
+ IOStatistics ioStatistics =
+ currentIOStatisticsContext().getThreadIOStatistics(testThreadId);
+
+ IOStatisticAssertions.assertThatStatisticCounter(ioStatistics,
+ StreamStatisticNames.STREAM_WRITE_BYTES)
+ .describedAs("Bytes wrote are not as expected for thread :{}",
Review Comment:
nit "written"
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -124,13 +124,15 @@
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
+import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
Review Comment:
nit, move down to L133
Issue Time Tracking
-------------------
Worklog Id: (was: 785213)
Time Spent: 2.5h (was: 2h 20m)
> Add thread-level IOStatistics Context
> -------------------------------------
>
> Key: HADOOP-17461
> URL: https://issues.apache.org/jira/browse/HADOOP-17461
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs, fs/azure, fs/s3
> Affects Versions: 3.3.1
> Reporter: Steve Loughran
> Assignee: Mehakmeet Singh
> Priority: Major
> Labels: pull-request-available
> Time Spent: 2.5h
> Remaining Estimate: 0h
>
> For effective reporting of the iostatistics of individual worker threads, we
> need a thread-level context which IO components update.
> * this contact needs to be passed in two background thread forming work on
> behalf of a task.
> * IO Components (streams, iterators, filesystems) need to update this context
> statistics as they perform work
> * Without double counting anything.
> I imagine a ThreadLocal IOStatisticContext which will be updated in the
> FileSystem API Calls. This context MUST be passed into the background threads
> used by a task, so that IO is correctly aggregated.
> I don't want streams, listIterators &c to do the updating as there is more
> risk of double counting. However, we need to see their statistics if we want
> to know things like "bytes discarded in backwards seeks". And I don't want to
> be updating a shared context object on every read() call.
> If all we want is store IO (HEAD, GET, DELETE, list performance etc) then the
> FS is sufficient.
> If we do want the stream-specific detail, then I propose
> * caching the context in the constructor
> * updating it only in close() or unbuffer() (as we do from S3AInputStream to
> S3AInstrumenation)
> * excluding those we know the FS already collects.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]