steveloughran commented on code in PR #4352: URL: https://github.com/apache/hadoop/pull/4352#discussion_r907678976
########## hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContext.java: ########## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.statistics.impl; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.impl.WeakReferenceThreadMap; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; +import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; + +import static org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED; +import static org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT; + +/** + * Implementing the IOStatisticsContext interface. Review Comment: javadocs here don't match with latest code. ########## hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContext.java: ########## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.statistics.impl; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.impl.WeakReferenceThreadMap; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; +import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; + +import static org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED; +import static org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT; + +/** + * Implementing the IOStatisticsContext interface. + * + * A Context defined for IOStatistics collection per thread which captures + * each worker thread's work in FS streams and stores it in the form of + * IOStatisticsAggregator which could be either IOStatisticsSnapshot or + * EmptyIOStatisticsStore if thread level aggregation is enabled or not for + * the FS. An active instance of the IOStatisticsContext can be used to + * collect the statistics. + * + * For the current thread the IOStatisticsSnapshot can be used as a way to + * move the IOStatistics data between applications using the Serializable + * nature of the class. + */ +public class IOStatisticsContext { + private static final Logger LOG = + LoggerFactory.getLogger(IOStatisticsContext.class); + private static final boolean IS_THREAD_IOSTATS_ENABLED; + + private static final WeakReferenceThreadMap<IOStatisticsContext> + ACTIVE_IOSTATS_CONTEXT = new WeakReferenceThreadMap<>( + IOStatisticsContext::createNewInstance, Review Comment: nit, indent this and L56 ########## hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContext.java: ########## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.statistics.impl; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.impl.WeakReferenceThreadMap; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; +import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; + +import static org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED; +import static org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT; + +/** + * Implementing the IOStatisticsContext interface. + * + * A Context defined for IOStatistics collection per thread which captures + * each worker thread's work in FS streams and stores it in the form of + * IOStatisticsAggregator which could be either IOStatisticsSnapshot or + * EmptyIOStatisticsStore if thread level aggregation is enabled or not for + * the FS. An active instance of the IOStatisticsContext can be used to + * collect the statistics. + * + * For the current thread the IOStatisticsSnapshot can be used as a way to + * move the IOStatistics data between applications using the Serializable + * nature of the class. + */ +public class IOStatisticsContext { + private static final Logger LOG = + LoggerFactory.getLogger(IOStatisticsContext.class); + private static final boolean IS_THREAD_IOSTATS_ENABLED; + + private static final WeakReferenceThreadMap<IOStatisticsContext> + ACTIVE_IOSTATS_CONTEXT = new WeakReferenceThreadMap<>( + IOStatisticsContext::createNewInstance, + IOStatisticsContext::referenceLostContext + ); + + /** + * Collecting IOStatistics per thread. + */ + private final WeakReferenceThreadMap<IOStatisticsAggregator> + threadIOStatsContext = new WeakReferenceThreadMap<>( + this::getIOStatisticsAggregatorFactory, + this::referenceLost); + + static { + // Work out if the current context have thread level IOStatistics enabled. Review Comment: nit "has" ########## hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContext.java: ########## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.statistics.impl; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.impl.WeakReferenceThreadMap; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; +import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; + +import static org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED; +import static org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT; + +/** + * Implementing the IOStatisticsContext interface. + * + * A Context defined for IOStatistics collection per thread which captures + * each worker thread's work in FS streams and stores it in the form of + * IOStatisticsAggregator which could be either IOStatisticsSnapshot or + * EmptyIOStatisticsStore if thread level aggregation is enabled or not for + * the FS. An active instance of the IOStatisticsContext can be used to + * collect the statistics. + * + * For the current thread the IOStatisticsSnapshot can be used as a way to + * move the IOStatistics data between applications using the Serializable + * nature of the class. + */ +public class IOStatisticsContext { + private static final Logger LOG = + LoggerFactory.getLogger(IOStatisticsContext.class); + private static final boolean IS_THREAD_IOSTATS_ENABLED; + + private static final WeakReferenceThreadMap<IOStatisticsContext> + ACTIVE_IOSTATS_CONTEXT = new WeakReferenceThreadMap<>( + IOStatisticsContext::createNewInstance, + IOStatisticsContext::referenceLostContext + ); + + /** + * Collecting IOStatistics per thread. + */ + private final WeakReferenceThreadMap<IOStatisticsAggregator> + threadIOStatsContext = new WeakReferenceThreadMap<>( + this::getIOStatisticsAggregatorFactory, + this::referenceLost); + + static { + // Work out if the current context have thread level IOStatistics enabled. + final Configuration configuration = new Configuration(); + IS_THREAD_IOSTATS_ENABLED = + configuration.getBoolean(THREAD_LEVEL_IOSTATISTICS_ENABLED, + THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT); + } + + /** + * Creating a new IOStatisticsContext instance for a FS to be used. + * + * @param key Thread ID that represents which thread the context belongs to. + * @return an instance of IOStatisticsContext. + */ + private static IOStatisticsContext createNewInstance(Long key) { + return new IOStatisticsContext(); + } + + /** + * Get the current IOStatisticsContext. + * + * @return current IOStatisticsContext instance. + */ + public static IOStatisticsContext currentIOStatisticsContext() { + return ACTIVE_IOSTATS_CONTEXT.get(Thread.currentThread().getId()); + } + + /** + * A Method to act as an IOStatisticsSnapshot factory, in a + * WeakReferenceThreadMap. + * + * @param key ThreadID. + * @return an Instance of IOStatisticsSnapshot. + */ + private IOStatisticsAggregator getIOStatisticsAggregatorFactory(Long key) { + return getThreadIOStatistics(); + } + + /** + * In case of reference loss. + * + * @param key ThreadID. + */ + private void referenceLost(Long key) { + LOG.debug("Reference lost for threadID: {}", key); + } + + /** + * In case of reference loss for IOStatisticsContext. + * + * @param key ThreadID. + */ + private static void referenceLostContext(Long key) { + LOG.debug("Reference lost for threadID for the context: {}", key); + } + + /** + * A Method to get the IOStatisticsAggregator of the currentThread. This + * denotes the aggregated IOStatistics per thread. + * + * @return the instance of IOStatisticsAggregator for the current thread. + */ + public IOStatisticsAggregator getThreadIOStatistics() { + // If Thread IOStats is disabled we return an emptyIOStatistics instance + // back. + if (!IS_THREAD_IOSTATS_ENABLED) { + return EmptyIOStatisticsStore.getInstance(); + } + + // If the current Thread ID already have an aggregator assigned, return + // that. + boolean isThreadIOStatsPresent = + threadIOStatsContext.containsKey(Thread.currentThread().getId()); + if (isThreadIOStatsPresent) { + return threadIOStatsContext.getForCurrentThread(); + } + + // If no aggregator is defined to the thread ID, create one and assign it. + IOStatisticsSnapshot ioStatisticsSnapshot = new IOStatisticsSnapshot(); + setThreadIOStatistics(ioStatisticsSnapshot); + return ioStatisticsSnapshot; + } + + /** + * Set the IOStatisticsAggregator for the current context for a specific + * thread. + * + * @param ioStatisticsAggregator IOStatisticsAggregator instance for the + * current thread. + */ + public void setThreadIOStatistics( + IOStatisticsAggregator ioStatisticsAggregator) { + threadIOStatsContext.setForCurrentThread(ioStatisticsAggregator); + } + + /** + * Returns a snapshot of the current thread's IOStatistics. + * + * @return IOStatisticsSnapshot of the current thread. + */ + public IOStatisticsSnapshot getThreadIOStatisticsSnapshot() { + if (IS_THREAD_IOSTATS_ENABLED) { + return (IOStatisticsSnapshot) getThreadIOStatistics(); + } + return new IOStatisticsSnapshot(); + } + + /** + * Get thread ID specific IOStatistics values. + * + * @param testThreadId thread ID. + * @return IOStatistics instance. + */ + @VisibleForTesting + public IOStatistics getThreadIOStatistics(long testThreadId) { + LOG.info("IOStatsContext thread ID required: {}", testThreadId); Review Comment: info? or debug? ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java: ########## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; + +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.statistics.IOStatisticAssertions; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; +import org.apache.hadoop.fs.statistics.StreamStatisticNames; +import org.apache.hadoop.util.concurrent.HadoopExecutors; + +import static org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED; +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; +import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsContext.currentIOStatisticsContext; + +/** + * Tests to verify the Thread-level IOStatistics. + */ +public class ITestS3AIOStatisticsContext extends AbstractS3ATestBase { + + private static final int SMALL_THREADS = 2; + private static final int BYTES_BIG = 100; + private static final int BYTES_SMALL = 50; + private static final long TEST_THREAD_ID = Thread.currentThread().getId(); + + /** + * Run this before the tests once, to note down some work in the + * constructor thread to be verified later on in a test. + */ + @BeforeClass + public static void beforeClass() throws Exception { + // Do some work in constructor thread. + S3AFileSystem fs = new S3AFileSystem(); + Configuration conf = new Configuration(); + fs.initialize(new URI(conf.get(TEST_FS_S3A_NAME)), conf); + Path path = new Path("testConstructor"); + try (FSDataOutputStream out = fs.create(path)) { + out.write('a'); + } + try (FSDataInputStream in = fs.open(path)) { + in.read(); + } + } + + @Override + protected Configuration createConfiguration() { + Configuration configuration = super.createConfiguration(); + removeBaseAndBucketOverrides(configuration, + THREAD_LEVEL_IOSTATISTICS_ENABLED); + return configuration; + } + + /** + * Verify that S3AInputStream aggregates per thread IOStats collection + * correctly. + */ + @Test + public void testS3AInputStreamIOStatisticsContext() + throws Exception { + S3AFileSystem fs = getFileSystem(); + Path path = path(getMethodName()); + byte[] data = dataset(256, 'a', 'z'); + byte[] readDataFirst = new byte[BYTES_BIG]; + byte[] readDataSecond = new byte[BYTES_SMALL]; + writeDataset(fs, path, data, data.length, 1024, true); + + final ExecutorService executor = + HadoopExecutors.newFixedThreadPool(SMALL_THREADS); + CountDownLatch latch = new CountDownLatch(SMALL_THREADS); + + try { + for (int i = 0; i < SMALL_THREADS; i++) { + executor.submit(() -> { + try { + IOStatistics ioStatisticsFirst; + try (FSDataInputStream in = fs.open(path)) { + in.seek(50); + in.read(readDataFirst); + in.close(); + ioStatisticsFirst = assertThreadStatisticsBytesRead(in, + BYTES_BIG); + } + // Stream is closed for a thread. Re-open and do more operations. + IOStatistics ioStatisticsSecond; + try (FSDataInputStream in = fs.open(path)) { + in.seek(100); + in.read(readDataSecond); + in.close(); + ioStatisticsSecond = assertThreadStatisticsBytesRead(in, + BYTES_BIG + BYTES_SMALL); + } + latch.countDown(); + } catch (Exception e) { + latch.countDown(); + setFutureException(e); + LOG.error("An error occurred while doing a task in the thread", e); + } catch (AssertionError ase) { + latch.countDown(); + setFutureAse(ase); + throw ase; + } + }); + } + // wait for tasks to finish. + latch.await(); + } finally { + executor.shutdown(); + } + + // Check if an Excp or ASE was caught while the test threads were running. + maybeReThrowFutureException(); + maybeReThrowFutureASE(); + + } + + /** + * Verify that S3ABlockOutputStream aggregates per thread IOStats collection + * correctly. + */ + @Test + public void testS3ABlockOutputStreamIOStatisticsContext() + throws Exception { + S3AFileSystem fs = getFileSystem(); + Path path = path(getMethodName()); + byte[] writeDataFirst = new byte[BYTES_BIG]; + byte[] writeDataSecond = new byte[BYTES_SMALL]; + + final ExecutorService executor = + HadoopExecutors.newFixedThreadPool(SMALL_THREADS); + CountDownLatch latch = new CountDownLatch(SMALL_THREADS); + + try { + for (int i = 0; i < SMALL_THREADS; i++) { + executor.submit(() -> { + try { + IOStatistics ioStatisticsFirst; + try (FSDataOutputStream out = fs.create(path)) { + out.write(writeDataFirst); + out.close(); + ioStatisticsFirst = assertThreadStatisticsBytesWrite(out, + BYTES_BIG); + } + // Stream is closed for a thread. Re-open and do more operations. + IOStatistics ioStatisticsSecond; + try (FSDataOutputStream out = fs.create(path)) { + out.write(writeDataSecond); + out.close(); + ioStatisticsSecond = assertThreadStatisticsBytesWrite(out, + BYTES_BIG + BYTES_SMALL); + } + latch.countDown(); + } catch (Exception e) { + latch.countDown(); + setFutureException(e); + LOG.error("An error occurred while doing a task in the thread", e); + } catch (AssertionError ase) { + latch.countDown(); + setFutureAse(ase); + throw ase; + } + }); + } + // wait for tasks to finish. + latch.await(); + } finally { + executor.shutdown(); + } + + // Check if an Excp or ASE was caught while the test threads were running. + maybeReThrowFutureException(); + maybeReThrowFutureASE(); + + } + + /** + * Verify stats collection and aggregation for constructor thread, Junit + * thread and a worker thread. + */ + @Test + public void testThreadIOStatisticsForDifferentThreads() + throws IOException, InterruptedException { + S3AFileSystem fs = getFileSystem(); + Path path = path(getMethodName()); + byte[] data = new byte[BYTES_BIG]; + long threadIdForTest = Thread.currentThread().getId(); + + // Write in the Junit thread. + try (FSDataOutputStream out = fs.create(path)) { + out.write(data); + } + + // Read in the Junit thread. + try (FSDataInputStream in = fs.open(path)) { + in.read(data); + } + + // Worker thread work and wait for it to finish. + TestWorkerThread workerThread = new TestWorkerThread(); + long workerThreadID = workerThread.getId(); + workerThread.start(); + workerThread.join(); + + // Work done in constructor: Wrote and Read 1 byte. + // Work done in Junit thread: Wrote and Read BYTES_BIG bytes. + // Work done in Junit's worker thread: Wrote and Read BYTES_SMALL bytes. + assertThreadStatisticsForThread(TEST_THREAD_ID, 1); + assertThreadStatisticsForThread(threadIdForTest, BYTES_BIG); + assertThreadStatisticsForThread(workerThreadID, BYTES_SMALL); + + } + + /** + * Assert bytes wrote by the current thread. + * + * @param out OutputStream. + * @param writeBytes expected bytes. + * @return IOStatistics for this stream. + */ + private IOStatistics assertThreadStatisticsBytesWrite(FSDataOutputStream out, + int writeBytes) { + S3ABlockOutputStream s3aOut = (S3ABlockOutputStream) out.getWrappedStream(); + IOStatistics ioStatistics = + (IOStatisticsSnapshot) s3aOut.getThreadIOStatistics(); + IOStatisticAssertions.assertThatStatisticCounter(ioStatistics, + StreamStatisticNames.STREAM_WRITE_BYTES) + .describedAs("Bytes wrote are not as expected") + .isEqualTo(writeBytes); + + return ioStatistics; + } + + /** + * Assert bytes read by the current thread. + * + * @param in InputStream. + * @param readBytes expected bytes. + * @return IOStatistics for this stream. + */ + private IOStatistics assertThreadStatisticsBytesRead(FSDataInputStream in, + int readBytes) { + S3AInputStream s3AInputStream = + (S3AInputStream) in.getWrappedStream(); + IOStatistics ioStatistics = s3AInputStream.getThreadIOStatistics(); + IOStatisticAssertions.assertThatStatisticCounter(ioStatistics, + StreamStatisticNames.STREAM_READ_BYTES) + .describedAs("Bytes read are not as expected") + .isEqualTo(readBytes); + + return ioStatistics; + } + + /** + * Assert fixed bytes wrote and read for a particular thread ID. + * + * @param testThreadId thread ID. + * @param expectedBytesWrittenAndRead expected bytes. + */ + private void assertThreadStatisticsForThread(long testThreadId, + int expectedBytesWrittenAndRead) { + LOG.info("Thread ID to be asserted: {}", testThreadId); + IOStatistics ioStatistics = + currentIOStatisticsContext().getThreadIOStatistics(testThreadId); + + IOStatisticAssertions.assertThatStatisticCounter(ioStatistics, + StreamStatisticNames.STREAM_WRITE_BYTES) + .describedAs("Bytes wrote are not as expected for thread :{}", + testThreadId) + .isEqualTo(expectedBytesWrittenAndRead); + + IOStatisticAssertions.assertThatStatisticCounter(ioStatistics, + StreamStatisticNames.STREAM_READ_BYTES) + .describedAs("Bytes read are not as expected for thread :{}", + testThreadId) + .isEqualTo(expectedBytesWrittenAndRead); + } + + /** + * Simulating doing some work in a separate thread. + */ + private class TestWorkerThread extends Thread implements Runnable { + @Override + public void run() { + S3AFileSystem fs = getFileSystem(); + Path path = new Path("workerThread"); Review Comment: make path a field, pass in the methodpath in its constructor. i want to be able to run multiple hadoop-aws test runs in the same bucket ASAP. ########## hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContext.java: ########## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.statistics.impl; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.impl.WeakReferenceThreadMap; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; +import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; + +import static org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED; +import static org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT; + +/** + * Implementing the IOStatisticsContext interface. + * + * A Context defined for IOStatistics collection per thread which captures + * each worker thread's work in FS streams and stores it in the form of + * IOStatisticsAggregator which could be either IOStatisticsSnapshot or + * EmptyIOStatisticsStore if thread level aggregation is enabled or not for + * the FS. An active instance of the IOStatisticsContext can be used to + * collect the statistics. + * + * For the current thread the IOStatisticsSnapshot can be used as a way to + * move the IOStatistics data between applications using the Serializable + * nature of the class. + */ +public class IOStatisticsContext { + private static final Logger LOG = + LoggerFactory.getLogger(IOStatisticsContext.class); + private static final boolean IS_THREAD_IOSTATS_ENABLED; + + private static final WeakReferenceThreadMap<IOStatisticsContext> + ACTIVE_IOSTATS_CONTEXT = new WeakReferenceThreadMap<>( + IOStatisticsContext::createNewInstance, + IOStatisticsContext::referenceLostContext + ); + + /** + * Collecting IOStatistics per thread. + */ + private final WeakReferenceThreadMap<IOStatisticsAggregator> + threadIOStatsContext = new WeakReferenceThreadMap<>( + this::getIOStatisticsAggregatorFactory, Review Comment: same; indent the args to the constructor ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java: ########## @@ -41,6 +41,8 @@ import com.amazonaws.services.s3.model.UploadPartRequest; import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; +import org.apache.hadoop.classification.VisibleForTesting; Review Comment: nit, should go above L43 ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java: ########## @@ -37,6 +37,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; Review Comment: move down to the "real" hadoop imports are ########## hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContext.java: ########## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.statistics.impl; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.impl.WeakReferenceThreadMap; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; +import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; + +import static org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED; +import static org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT; + +/** + * Implementing the IOStatisticsContext interface. + * + * A Context defined for IOStatistics collection per thread which captures + * each worker thread's work in FS streams and stores it in the form of + * IOStatisticsAggregator which could be either IOStatisticsSnapshot or + * EmptyIOStatisticsStore if thread level aggregation is enabled or not for + * the FS. An active instance of the IOStatisticsContext can be used to + * collect the statistics. + * + * For the current thread the IOStatisticsSnapshot can be used as a way to + * move the IOStatistics data between applications using the Serializable + * nature of the class. + */ +public class IOStatisticsContext { + private static final Logger LOG = + LoggerFactory.getLogger(IOStatisticsContext.class); + private static final boolean IS_THREAD_IOSTATS_ENABLED; + + private static final WeakReferenceThreadMap<IOStatisticsContext> + ACTIVE_IOSTATS_CONTEXT = new WeakReferenceThreadMap<>( + IOStatisticsContext::createNewInstance, + IOStatisticsContext::referenceLostContext + ); + + /** + * Collecting IOStatistics per thread. + */ + private final WeakReferenceThreadMap<IOStatisticsAggregator> + threadIOStatsContext = new WeakReferenceThreadMap<>( + this::getIOStatisticsAggregatorFactory, + this::referenceLost); + + static { + // Work out if the current context have thread level IOStatistics enabled. + final Configuration configuration = new Configuration(); + IS_THREAD_IOSTATS_ENABLED = + configuration.getBoolean(THREAD_LEVEL_IOSTATISTICS_ENABLED, + THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT); + } + + /** + * Creating a new IOStatisticsContext instance for a FS to be used. + * + * @param key Thread ID that represents which thread the context belongs to. + * @return an instance of IOStatisticsContext. + */ + private static IOStatisticsContext createNewInstance(Long key) { + return new IOStatisticsContext(); + } + + /** + * Get the current IOStatisticsContext. + * + * @return current IOStatisticsContext instance. + */ + public static IOStatisticsContext currentIOStatisticsContext() { + return ACTIVE_IOSTATS_CONTEXT.get(Thread.currentThread().getId()); + } + + /** + * A Method to act as an IOStatisticsSnapshot factory, in a + * WeakReferenceThreadMap. + * + * @param key ThreadID. + * @return an Instance of IOStatisticsSnapshot. + */ + private IOStatisticsAggregator getIOStatisticsAggregatorFactory(Long key) { + return getThreadIOStatistics(); + } + + /** + * In case of reference loss. + * + * @param key ThreadID. + */ + private void referenceLost(Long key) { + LOG.debug("Reference lost for threadID: {}", key); + } + + /** + * In case of reference loss for IOStatisticsContext. + * + * @param key ThreadID. + */ + private static void referenceLostContext(Long key) { + LOG.debug("Reference lost for threadID for the context: {}", key); + } + + /** + * A Method to get the IOStatisticsAggregator of the currentThread. This + * denotes the aggregated IOStatistics per thread. + * + * @return the instance of IOStatisticsAggregator for the current thread. + */ + public IOStatisticsAggregator getThreadIOStatistics() { + // If Thread IOStats is disabled we return an emptyIOStatistics instance + // back. + if (!IS_THREAD_IOSTATS_ENABLED) { + return EmptyIOStatisticsStore.getInstance(); + } + + // If the current Thread ID already have an aggregator assigned, return + // that. + boolean isThreadIOStatsPresent = + threadIOStatsContext.containsKey(Thread.currentThread().getId()); + if (isThreadIOStatsPresent) { + return threadIOStatsContext.getForCurrentThread(); + } + + // If no aggregator is defined to the thread ID, create one and assign it. + IOStatisticsSnapshot ioStatisticsSnapshot = new IOStatisticsSnapshot(); + setThreadIOStatistics(ioStatisticsSnapshot); + return ioStatisticsSnapshot; + } + + /** + * Set the IOStatisticsAggregator for the current context for a specific + * thread. + * + * @param ioStatisticsAggregator IOStatisticsAggregator instance for the + * current thread. + */ + public void setThreadIOStatistics( + IOStatisticsAggregator ioStatisticsAggregator) { + threadIOStatsContext.setForCurrentThread(ioStatisticsAggregator); + } + + /** + * Returns a snapshot of the current thread's IOStatistics. + * + * @return IOStatisticsSnapshot of the current thread. + */ + public IOStatisticsSnapshot getThreadIOStatisticsSnapshot() { Review Comment: how does this get used. as it is not a snapshot if another thread is actively updating the thread. if this is to be used to get a copy of the stats, we should call the method `snapshotCurrentThreadIOStatistics()` and always return a snapshot, either an empty one or one off the current thread's stats. ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java: ########## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; + +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.statistics.IOStatisticAssertions; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; +import org.apache.hadoop.fs.statistics.StreamStatisticNames; +import org.apache.hadoop.util.concurrent.HadoopExecutors; + +import static org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED; +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; +import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsContext.currentIOStatisticsContext; + +/** + * Tests to verify the Thread-level IOStatistics. + */ +public class ITestS3AIOStatisticsContext extends AbstractS3ATestBase { + + private static final int SMALL_THREADS = 2; + private static final int BYTES_BIG = 100; + private static final int BYTES_SMALL = 50; + private static final long TEST_THREAD_ID = Thread.currentThread().getId(); + + /** + * Run this before the tests once, to note down some work in the + * constructor thread to be verified later on in a test. + */ + @BeforeClass + public static void beforeClass() throws Exception { + // Do some work in constructor thread. + S3AFileSystem fs = new S3AFileSystem(); + Configuration conf = new Configuration(); + fs.initialize(new URI(conf.get(TEST_FS_S3A_NAME)), conf); + Path path = new Path("testConstructor"); + try (FSDataOutputStream out = fs.create(path)) { + out.write('a'); + } + try (FSDataInputStream in = fs.open(path)) { + in.read(); + } + } + + @Override + protected Configuration createConfiguration() { + Configuration configuration = super.createConfiguration(); + removeBaseAndBucketOverrides(configuration, + THREAD_LEVEL_IOSTATISTICS_ENABLED); + return configuration; + } + + /** + * Verify that S3AInputStream aggregates per thread IOStats collection + * correctly. + */ + @Test + public void testS3AInputStreamIOStatisticsContext() + throws Exception { + S3AFileSystem fs = getFileSystem(); + Path path = path(getMethodName()); + byte[] data = dataset(256, 'a', 'z'); + byte[] readDataFirst = new byte[BYTES_BIG]; + byte[] readDataSecond = new byte[BYTES_SMALL]; + writeDataset(fs, path, data, data.length, 1024, true); + + final ExecutorService executor = + HadoopExecutors.newFixedThreadPool(SMALL_THREADS); + CountDownLatch latch = new CountDownLatch(SMALL_THREADS); + + try { + for (int i = 0; i < SMALL_THREADS; i++) { + executor.submit(() -> { + try { + IOStatistics ioStatisticsFirst; + try (FSDataInputStream in = fs.open(path)) { + in.seek(50); + in.read(readDataFirst); + in.close(); + ioStatisticsFirst = assertThreadStatisticsBytesRead(in, + BYTES_BIG); + } + // Stream is closed for a thread. Re-open and do more operations. + IOStatistics ioStatisticsSecond; + try (FSDataInputStream in = fs.open(path)) { + in.seek(100); + in.read(readDataSecond); + in.close(); + ioStatisticsSecond = assertThreadStatisticsBytesRead(in, + BYTES_BIG + BYTES_SMALL); + } + latch.countDown(); + } catch (Exception e) { + latch.countDown(); + setFutureException(e); + LOG.error("An error occurred while doing a task in the thread", e); + } catch (AssertionError ase) { + latch.countDown(); + setFutureAse(ase); + throw ase; + } + }); + } + // wait for tasks to finish. + latch.await(); + } finally { + executor.shutdown(); + } + + // Check if an Excp or ASE was caught while the test threads were running. + maybeReThrowFutureException(); + maybeReThrowFutureASE(); + + } + + /** + * Verify that S3ABlockOutputStream aggregates per thread IOStats collection + * correctly. + */ + @Test + public void testS3ABlockOutputStreamIOStatisticsContext() + throws Exception { + S3AFileSystem fs = getFileSystem(); + Path path = path(getMethodName()); + byte[] writeDataFirst = new byte[BYTES_BIG]; + byte[] writeDataSecond = new byte[BYTES_SMALL]; + + final ExecutorService executor = + HadoopExecutors.newFixedThreadPool(SMALL_THREADS); + CountDownLatch latch = new CountDownLatch(SMALL_THREADS); + + try { + for (int i = 0; i < SMALL_THREADS; i++) { + executor.submit(() -> { + try { + IOStatistics ioStatisticsFirst; + try (FSDataOutputStream out = fs.create(path)) { + out.write(writeDataFirst); + out.close(); + ioStatisticsFirst = assertThreadStatisticsBytesWrite(out, + BYTES_BIG); + } + // Stream is closed for a thread. Re-open and do more operations. + IOStatistics ioStatisticsSecond; + try (FSDataOutputStream out = fs.create(path)) { + out.write(writeDataSecond); + out.close(); + ioStatisticsSecond = assertThreadStatisticsBytesWrite(out, + BYTES_BIG + BYTES_SMALL); + } + latch.countDown(); + } catch (Exception e) { + latch.countDown(); + setFutureException(e); + LOG.error("An error occurred while doing a task in the thread", e); + } catch (AssertionError ase) { + latch.countDown(); + setFutureAse(ase); + throw ase; + } + }); + } + // wait for tasks to finish. + latch.await(); + } finally { + executor.shutdown(); + } + + // Check if an Excp or ASE was caught while the test threads were running. + maybeReThrowFutureException(); + maybeReThrowFutureASE(); + + } + + /** + * Verify stats collection and aggregation for constructor thread, Junit + * thread and a worker thread. + */ + @Test + public void testThreadIOStatisticsForDifferentThreads() + throws IOException, InterruptedException { + S3AFileSystem fs = getFileSystem(); + Path path = path(getMethodName()); + byte[] data = new byte[BYTES_BIG]; + long threadIdForTest = Thread.currentThread().getId(); + + // Write in the Junit thread. + try (FSDataOutputStream out = fs.create(path)) { + out.write(data); + } + + // Read in the Junit thread. + try (FSDataInputStream in = fs.open(path)) { + in.read(data); + } + + // Worker thread work and wait for it to finish. + TestWorkerThread workerThread = new TestWorkerThread(); + long workerThreadID = workerThread.getId(); + workerThread.start(); + workerThread.join(); + + // Work done in constructor: Wrote and Read 1 byte. + // Work done in Junit thread: Wrote and Read BYTES_BIG bytes. + // Work done in Junit's worker thread: Wrote and Read BYTES_SMALL bytes. + assertThreadStatisticsForThread(TEST_THREAD_ID, 1); + assertThreadStatisticsForThread(threadIdForTest, BYTES_BIG); + assertThreadStatisticsForThread(workerThreadID, BYTES_SMALL); + + } + + /** + * Assert bytes wrote by the current thread. + * + * @param out OutputStream. + * @param writeBytes expected bytes. + * @return IOStatistics for this stream. + */ + private IOStatistics assertThreadStatisticsBytesWrite(FSDataOutputStream out, + int writeBytes) { + S3ABlockOutputStream s3aOut = (S3ABlockOutputStream) out.getWrappedStream(); + IOStatistics ioStatistics = + (IOStatisticsSnapshot) s3aOut.getThreadIOStatistics(); + IOStatisticAssertions.assertThatStatisticCounter(ioStatistics, + StreamStatisticNames.STREAM_WRITE_BYTES) + .describedAs("Bytes wrote are not as expected") + .isEqualTo(writeBytes); + + return ioStatistics; + } + + /** + * Assert bytes read by the current thread. + * + * @param in InputStream. + * @param readBytes expected bytes. + * @return IOStatistics for this stream. + */ + private IOStatistics assertThreadStatisticsBytesRead(FSDataInputStream in, + int readBytes) { + S3AInputStream s3AInputStream = + (S3AInputStream) in.getWrappedStream(); + IOStatistics ioStatistics = s3AInputStream.getThreadIOStatistics(); + IOStatisticAssertions.assertThatStatisticCounter(ioStatistics, + StreamStatisticNames.STREAM_READ_BYTES) + .describedAs("Bytes read are not as expected") + .isEqualTo(readBytes); + + return ioStatistics; + } + + /** + * Assert fixed bytes wrote and read for a particular thread ID. + * + * @param testThreadId thread ID. + * @param expectedBytesWrittenAndRead expected bytes. + */ + private void assertThreadStatisticsForThread(long testThreadId, + int expectedBytesWrittenAndRead) { + LOG.info("Thread ID to be asserted: {}", testThreadId); + IOStatistics ioStatistics = + currentIOStatisticsContext().getThreadIOStatistics(testThreadId); + + IOStatisticAssertions.assertThatStatisticCounter(ioStatistics, + StreamStatisticNames.STREAM_WRITE_BYTES) + .describedAs("Bytes wrote are not as expected for thread :{}", Review Comment: nit "written" ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java: ########## @@ -124,13 +124,15 @@ import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum; import org.apache.hadoop.fs.s3a.impl.StoreContext; import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder; +import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; Review Comment: nit, move down to L133 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
