[ https://issues.apache.org/jira/browse/HADOOP-17461?focusedWorklogId=792243&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-792243 ]
ASF GitHub Bot logged work on HADOOP-17461: ------------------------------------------- Author: ASF GitHub Bot Created on: 18/Jul/22 16:48 Start Date: 18/Jul/22 16:48 Worklog Time Spent: 10m Work Description: steveloughran commented on code in PR #4352: URL: https://github.com/apache/hadoop/pull/4352#discussion_r920337912 ########## hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextImpl.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.statistics.impl; + +import java.lang.ref.WeakReference; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.impl.WeakReferenceThreadMap; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; +import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; + +import static org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED; +import static org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT; + +/** + * Implementing the IOStatisticsContext. + * + * A Context defined for IOStatistics collection per thread which captures + * each worker thread's work in FS streams and stores it in the form of + * IOStatisticsSnapshot if thread level aggregation is enabled else returning + * an instance of EmptyIOStatisticsStore for the FS. An active instance of + * the IOStatisticsContext can be used to collect the statistics. + * + * For the current thread the IOStatisticsSnapshot can be used as a way to + * move the IOStatistics data between applications using the Serializable + * nature of the class. + */ +public class IOStatisticsContextImpl implements IOStatisticsContext { + private static final Logger LOG = + LoggerFactory.getLogger(IOStatisticsContextImpl.class); + private static final boolean IS_THREAD_IOSTATS_ENABLED; + + /** + * Active IOStatistics Context containing different worker thread's + * statistics. Weak Reference so that it gets cleaned up during GC and we + * avoid any memory leak issues due to long lived references. + */ + private static final WeakReferenceThreadMap<IOStatisticsContext> + ACTIVE_IOSTATS_CONTEXT = + new WeakReferenceThreadMap<>(IOStatisticsContextImpl::createNewInstance, + IOStatisticsContextImpl::referenceLostContext + ); + + /** + * Collecting IOStatistics per thread. + */ + private final WeakReferenceThreadMap<IOStatisticsSnapshot> + threadLevelIOStatisticsMap = + new WeakReferenceThreadMap<>(this::getIOStatisticsSnapshotFactory, + this::referenceLost); + + static { + // Work out if the current context has thread level IOStatistics enabled. + final Configuration configuration = new Configuration(); + IS_THREAD_IOSTATS_ENABLED = + configuration.getBoolean(THREAD_LEVEL_IOSTATISTICS_ENABLED, + THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT); + } + + /** + * Creating a new IOStatisticsContext instance for a FS to be used. If Review Comment: cropped sentence at end ########## hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextImpl.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.statistics.impl; + +import java.lang.ref.WeakReference; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.impl.WeakReferenceThreadMap; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; +import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; + +import static org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED; +import static org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT; + +/** + * Implementing the IOStatisticsContext. + * + * A Context defined for IOStatistics collection per thread which captures + * each worker thread's work in FS streams and stores it in the form of + * IOStatisticsSnapshot if thread level aggregation is enabled else returning + * an instance of EmptyIOStatisticsStore for the FS. An active instance of + * the IOStatisticsContext can be used to collect the statistics. + * + * For the current thread the IOStatisticsSnapshot can be used as a way to + * move the IOStatistics data between applications using the Serializable + * nature of the class. + */ +public class IOStatisticsContextImpl implements IOStatisticsContext { + private static final Logger LOG = + LoggerFactory.getLogger(IOStatisticsContextImpl.class); + private static final boolean IS_THREAD_IOSTATS_ENABLED; + + /** + * Active IOStatistics Context containing different worker thread's + * statistics. Weak Reference so that it gets cleaned up during GC and we + * avoid any memory leak issues due to long lived references. + */ + private static final WeakReferenceThreadMap<IOStatisticsContext> + ACTIVE_IOSTATS_CONTEXT = + new WeakReferenceThreadMap<>(IOStatisticsContextImpl::createNewInstance, + IOStatisticsContextImpl::referenceLostContext + ); + + /** + * Collecting IOStatistics per thread. + */ + private final WeakReferenceThreadMap<IOStatisticsSnapshot> + threadLevelIOStatisticsMap = + new WeakReferenceThreadMap<>(this::getIOStatisticsSnapshotFactory, + this::referenceLost); + + static { + // Work out if the current context has thread level IOStatistics enabled. + final Configuration configuration = new Configuration(); + IS_THREAD_IOSTATS_ENABLED = + configuration.getBoolean(THREAD_LEVEL_IOSTATISTICS_ENABLED, + THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT); + } + + /** + * Creating a new IOStatisticsContext instance for a FS to be used. If + * + * @param key Thread ID that represents which thread the context belongs to. + * @return an instance of IOStatisticsContext. + */ + private static IOStatisticsContext createNewInstance(Long key) { + if (!IS_THREAD_IOSTATS_ENABLED) { + return new EmptyIOStatisticsContextImpl(); + } + return new IOStatisticsContextImpl(); + } + + /** + * A Method to act as an IOStatisticsSnapshot factory, in a + * WeakReferenceThreadMap. + * + * @param key ThreadID. + * @return an Instance of IOStatisticsSnapshot. + */ + private IOStatisticsSnapshot getIOStatisticsSnapshotFactory(Long key) { + return new IOStatisticsSnapshot(); + } + + /** + * In case of reference loss. + * + * @param key ThreadID. + */ + private void referenceLost(Long key) { + LOG.debug("Reference lost for threadID: {}", key); + } + + /** + * In case of reference loss for IOStatisticsContext. + * + * @param key ThreadID. + */ + private static void referenceLostContext(Long key) { + LOG.debug("Reference lost for threadID for the context: {}", key); + } + + /** + * Get the current thread's IOStatisticsContext instance. If no instance is + * present for this thread ID, create one using the factory. + * + * @return instance of IOStatisticsContext. + */ + @Override + public IOStatisticsContext getCurrentIOStatisticsContext() { + return ACTIVE_IOSTATS_CONTEXT.getForCurrentThread(); + } + + /** + * A Method to get the IOStatisticsAggregator of the currentThread. This + * denotes the aggregated IOStatistics per thread. + * + * @return the instance of IOStatisticsAggregator for the current thread. + */ + @Override + public IOStatisticsAggregator getThreadIOStatisticsAggregator() { + // If the current Thread ID already have an aggregator assigned, return + // that. + boolean isThreadIOStatsPresent = Review Comment: use the context map and ask it for its aggregator ########## hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java: ########## @@ -48,7 +49,15 @@ public long currentThreadId() { } public V setForCurrentThread(V newVal) { - return put(currentThreadId(), newVal); + long id = currentThreadId(); Review Comment: do we need a precondition on a null value here? ########## hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContext.java: ########## @@ -20,22 +20,28 @@ import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; +import org.apache.hadoop.fs.statistics.IOStatisticsSource; /** * An interface defined to capture thread-level IOStatistics by using per - * thread context consisting of IOStatisticsSnapshot thread map for each - * worker thread. - * EmptyIOStatisticsSource is returned as an aggregator if this feature is - * disabled, resulting in a no-op in aggregation. + * thread context. + * <p> + * The aggregator should be collected in their constructor by statistics-generating + * classes to obtain the aggregator to update <i>across all threads</i>. + * <p> + * The {@link #snapshot()} call creates a snapshot of the statistics; + * <p> + * The {@link #reset()} call resets the statistics in the current thread so + * that later snapshots will get the incremental data. */ -public interface IOStatisticsContext { +public interface IOStatisticsContext extends IOStatisticsSource { /** - * Get the current thread's IOStatisticsContext. + * Get the IOStatisticsAggregator for the current thread. Review Comment: "for the context" ########## hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContext.java: ########## @@ -20,22 +20,28 @@ import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; +import org.apache.hadoop.fs.statistics.IOStatisticsSource; /** * An interface defined to capture thread-level IOStatistics by using per - * thread context consisting of IOStatisticsSnapshot thread map for each - * worker thread. - * EmptyIOStatisticsSource is returned as an aggregator if this feature is - * disabled, resulting in a no-op in aggregation. + * thread context. + * <p> + * The aggregator should be collected in their constructor by statistics-generating + * classes to obtain the aggregator to update <i>across all threads</i>. + * <p> + * The {@link #snapshot()} call creates a snapshot of the statistics; + * <p> + * The {@link #reset()} call resets the statistics in the current thread so + * that later snapshots will get the incremental data. */ -public interface IOStatisticsContext { +public interface IOStatisticsContext extends IOStatisticsSource { /** - * Get the current thread's IOStatisticsContext. + * Get the IOStatisticsAggregator for the current thread. * - * @return instance of IOStatisticsContext for the current thread. + * @return return the aggregator for current thread. */ - IOStatisticsContext getCurrentIOStatisticsContext(); + IOStatisticsAggregator getAggregator(); /** * Capture the snapshot of current thread's IOStatistics. Review Comment: "of the context's" ########## hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextImpl.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.statistics.impl; + +import java.lang.ref.WeakReference; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.impl.WeakReferenceThreadMap; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; +import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; + +import static org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED; +import static org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT; + +/** + * Implementing the IOStatisticsContext. + * + * A Context defined for IOStatistics collection per thread which captures + * each worker thread's work in FS streams and stores it in the form of + * IOStatisticsSnapshot if thread level aggregation is enabled else returning + * an instance of EmptyIOStatisticsStore for the FS. An active instance of + * the IOStatisticsContext can be used to collect the statistics. + * + * For the current thread the IOStatisticsSnapshot can be used as a way to + * move the IOStatistics data between applications using the Serializable + * nature of the class. + */ +public class IOStatisticsContextImpl implements IOStatisticsContext { + private static final Logger LOG = + LoggerFactory.getLogger(IOStatisticsContextImpl.class); + private static final boolean IS_THREAD_IOSTATS_ENABLED; + + /** + * Active IOStatistics Context containing different worker thread's + * statistics. Weak Reference so that it gets cleaned up during GC and we + * avoid any memory leak issues due to long lived references. + */ + private static final WeakReferenceThreadMap<IOStatisticsContext> + ACTIVE_IOSTATS_CONTEXT = + new WeakReferenceThreadMap<>(IOStatisticsContextImpl::createNewInstance, + IOStatisticsContextImpl::referenceLostContext + ); + + /** + * Collecting IOStatistics per thread. + */ + private final WeakReferenceThreadMap<IOStatisticsSnapshot> Review Comment: the context map can support this; just look it up and call getThreadIOStatisticsAggregator() on it ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java: ########## @@ -66,20 +66,21 @@ public void setUp() throws Exception { */ @BeforeClass public static void beforeClass() throws Exception { - currentIOStatisticsContext().resetThreadIOStatisticsForCurrentThread(); // Do some work in constructor thread. S3AFileSystem fs = new S3AFileSystem(); Configuration conf = new Configuration(); fs.initialize(new URI(conf.get(TEST_FS_S3A_NAME)), conf); Path path = new Path("testConstructor"); + IOStatisticsContextImpl ioStatisticsContext = fs.getIoStatisticsContext(); Review Comment: should be IOStatisticsContext ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java: ########## @@ -5231,4 +5234,13 @@ public RequestFactory getRequestFactory() { public boolean isCSEEnabled() { return isCSEEnabled; } + + /** + * Get the FileSystem's IOStatisticsContext. + * @return the instance of IOStatisticsContextImpl. + */ + @VisibleForTesting + public IOStatisticsContextImpl getIoStatisticsContext() { Review Comment: don't cast. if tests want to cast it, and they know it is safe, they can do so ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java: ########## @@ -66,6 +70,12 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase { private static final Logger LOG = LoggerFactory.getLogger(AbstractCommitITest.class); + /** + * Job statistics accrued across all test cases Review Comment: needs a . ########## hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContext.java: ########## @@ -20,22 +20,28 @@ import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; +import org.apache.hadoop.fs.statistics.IOStatisticsSource; /** * An interface defined to capture thread-level IOStatistics by using per - * thread context consisting of IOStatisticsSnapshot thread map for each - * worker thread. - * EmptyIOStatisticsSource is returned as an aggregator if this feature is - * disabled, resulting in a no-op in aggregation. + * thread context. + * <p> + * The aggregator should be collected in their constructor by statistics-generating + * classes to obtain the aggregator to update <i>across all threads</i>. + * <p> + * The {@link #snapshot()} call creates a snapshot of the statistics; + * <p> + * The {@link #reset()} call resets the statistics in the current thread so + * that later snapshots will get the incremental data. */ -public interface IOStatisticsContext { +public interface IOStatisticsContext extends IOStatisticsSource { /** - * Get the current thread's IOStatisticsContext. + * Get the IOStatisticsAggregator for the current thread. * - * @return instance of IOStatisticsContext for the current thread. + * @return return the aggregator for current thread. Review Comment: "for the context" ########## hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextImpl.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.statistics.impl; + +import java.lang.ref.WeakReference; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.impl.WeakReferenceThreadMap; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; +import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; + +import static org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED; +import static org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT; + +/** + * Implementing the IOStatisticsContext. + * + * A Context defined for IOStatistics collection per thread which captures + * each worker thread's work in FS streams and stores it in the form of + * IOStatisticsSnapshot if thread level aggregation is enabled else returning + * an instance of EmptyIOStatisticsStore for the FS. An active instance of + * the IOStatisticsContext can be used to collect the statistics. + * + * For the current thread the IOStatisticsSnapshot can be used as a way to + * move the IOStatistics data between applications using the Serializable + * nature of the class. + */ +public class IOStatisticsContextImpl implements IOStatisticsContext { + private static final Logger LOG = + LoggerFactory.getLogger(IOStatisticsContextImpl.class); + private static final boolean IS_THREAD_IOSTATS_ENABLED; + + /** + * Active IOStatistics Context containing different worker thread's + * statistics. Weak Reference so that it gets cleaned up during GC and we + * avoid any memory leak issues due to long lived references. + */ + private static final WeakReferenceThreadMap<IOStatisticsContext> + ACTIVE_IOSTATS_CONTEXT = + new WeakReferenceThreadMap<>(IOStatisticsContextImpl::createNewInstance, + IOStatisticsContextImpl::referenceLostContext + ); + + /** + * Collecting IOStatistics per thread. + */ + private final WeakReferenceThreadMap<IOStatisticsSnapshot> + threadLevelIOStatisticsMap = + new WeakReferenceThreadMap<>(this::getIOStatisticsSnapshotFactory, + this::referenceLost); + + static { + // Work out if the current context has thread level IOStatistics enabled. + final Configuration configuration = new Configuration(); + IS_THREAD_IOSTATS_ENABLED = + configuration.getBoolean(THREAD_LEVEL_IOSTATISTICS_ENABLED, + THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT); + } + + /** + * Creating a new IOStatisticsContext instance for a FS to be used. If + * + * @param key Thread ID that represents which thread the context belongs to. + * @return an instance of IOStatisticsContext. + */ + private static IOStatisticsContext createNewInstance(Long key) { + if (!IS_THREAD_IOSTATS_ENABLED) { + return new EmptyIOStatisticsContextImpl(); + } + return new IOStatisticsContextImpl(); + } + + /** + * A Method to act as an IOStatisticsSnapshot factory, in a + * WeakReferenceThreadMap. + * + * @param key ThreadID. + * @return an Instance of IOStatisticsSnapshot. + */ + private IOStatisticsSnapshot getIOStatisticsSnapshotFactory(Long key) { + return new IOStatisticsSnapshot(); + } + + /** + * In case of reference loss. + * + * @param key ThreadID. + */ + private void referenceLost(Long key) { + LOG.debug("Reference lost for threadID: {}", key); + } + + /** + * In case of reference loss for IOStatisticsContext. + * + * @param key ThreadID. + */ + private static void referenceLostContext(Long key) { + LOG.debug("Reference lost for threadID for the context: {}", key); + } + + /** + * Get the current thread's IOStatisticsContext instance. If no instance is + * present for this thread ID, create one using the factory. + * + * @return instance of IOStatisticsContext. + */ + @Override + public IOStatisticsContext getCurrentIOStatisticsContext() { + return ACTIVE_IOSTATS_CONTEXT.getForCurrentThread(); + } + + /** + * A Method to get the IOStatisticsAggregator of the currentThread. This + * denotes the aggregated IOStatistics per thread. + * + * @return the instance of IOStatisticsAggregator for the current thread. + */ + @Override + public IOStatisticsAggregator getThreadIOStatisticsAggregator() { + // If the current Thread ID already have an aggregator assigned, return + // that. + boolean isThreadIOStatsPresent = + threadLevelIOStatisticsMap.containsKey(Thread.currentThread().getId()); + if (isThreadIOStatsPresent) { + return threadLevelIOStatisticsMap.getForCurrentThread(); + } + LOG.debug("No thread iostats present creating it for :{}", + Thread.currentThread().getId()); + // If no aggregator is defined to the thread ID, create one and assign it. + IOStatisticsSnapshot ioStatisticsSnapshot = new IOStatisticsSnapshot(); + setThreadIOStatistics(ioStatisticsSnapshot); + return ioStatisticsSnapshot; + } + + /** + * Set the IOStatisticsSnapshot for the current context for a specific + * thread. + * + * @param ioStatisticsSnapshot IOStatisticsAggregator instance for the + * current thread. + */ + public void setThreadIOStatistics( + IOStatisticsSnapshot ioStatisticsSnapshot) { + threadLevelIOStatisticsMap.setForCurrentThread(ioStatisticsSnapshot); + } + + /** + * Returns a snapshot of the current thread's IOStatistics. + * + * @return IOStatisticsSnapshot of the current thread. + */ + @Override + public IOStatisticsSnapshot snapshot() { + if (threadLevelIOStatisticsMap.containsKey(getCurrentThreadID())) { + return threadLevelIOStatisticsMap.get(getCurrentThreadID()); + } + return new IOStatisticsSnapshot(); + } + + /** + * Reset the thread IOStatistics for current thread. + */ + @Override + public void reset() { + WeakReference<IOStatisticsSnapshot> ioStatisticsSnapshotRef = + threadLevelIOStatisticsMap.lookup(getCurrentThreadID()); + if (ioStatisticsSnapshotRef != null) { + IOStatisticsSnapshot ioStatisticsSnapshot = ioStatisticsSnapshotRef.get(); + // Get the snapshot for the current thread ID and clear it. + if(ioStatisticsSnapshot != null) { + ioStatisticsSnapshot.clear(); + } + } + } + + /** + * Get the current thread's ID. + * @return long value of the thread ID. + */ + private Long getCurrentThreadID() { + return Thread.currentThread().getId(); + } + + /** + * Get thread ID specific IOStatistics values. + * + * @param testThreadId thread ID. + * @return IOStatistics instance. + */ + @VisibleForTesting + public IOStatistics getThreadSpecificIOStatistics(long testThreadId) { Review Comment: tests should also look at the context map; lets them test more ########## hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContext.java: ########## @@ -20,22 +20,28 @@ import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; +import org.apache.hadoop.fs.statistics.IOStatisticsSource; /** * An interface defined to capture thread-level IOStatistics by using per - * thread context consisting of IOStatisticsSnapshot thread map for each - * worker thread. - * EmptyIOStatisticsSource is returned as an aggregator if this feature is - * disabled, resulting in a no-op in aggregation. + * thread context. + * <p> + * The aggregator should be collected in their constructor by statistics-generating + * classes to obtain the aggregator to update <i>across all threads</i>. + * <p> + * The {@link #snapshot()} call creates a snapshot of the statistics; + * <p> + * The {@link #reset()} call resets the statistics in the current thread so + * that later snapshots will get the incremental data. */ -public interface IOStatisticsContext { +public interface IOStatisticsContext extends IOStatisticsSource { Review Comment: all the javadocs saying "the current thread" except for the static method to do the lookup MUST be changed to just say ""the context" ########## hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java: ########## @@ -48,7 +49,15 @@ public long currentThreadId() { } public V setForCurrentThread(V newVal) { - return put(currentThreadId(), newVal); + long id = currentThreadId(); + + // if the same object is already in the map, just return it. + WeakReference<V> ref = lookup(id); + if (ref != null && ref.get() == newVal) { Review Comment: if there's been a GC ref.get could be null. now, as we are doing == and not Object.equals that is safe. we should mention that as a comment Issue Time Tracking ------------------- Worklog Id: (was: 792243) Time Spent: 5.5h (was: 5h 20m) > Add thread-level IOStatistics Context > ------------------------------------- > > Key: HADOOP-17461 > URL: https://issues.apache.org/jira/browse/HADOOP-17461 > Project: Hadoop Common > Issue Type: Sub-task > Components: fs, fs/azure, fs/s3 > Affects Versions: 3.3.1 > Reporter: Steve Loughran > Assignee: Mehakmeet Singh > Priority: Major > Labels: pull-request-available > Time Spent: 5.5h > Remaining Estimate: 0h > > For effective reporting of the iostatistics of individual worker threads, we > need a thread-level context which IO components update. > * this contact needs to be passed in two background thread forming work on > behalf of a task. > * IO Components (streams, iterators, filesystems) need to update this context > statistics as they perform work > * Without double counting anything. > I imagine a ThreadLocal IOStatisticContext which will be updated in the > FileSystem API Calls. This context MUST be passed into the background threads > used by a task, so that IO is correctly aggregated. > I don't want streams, listIterators &c to do the updating as there is more > risk of double counting. However, we need to see their statistics if we want > to know things like "bytes discarded in backwards seeks". And I don't want to > be updating a shared context object on every read() call. > If all we want is store IO (HEAD, GET, DELETE, list performance etc) then the > FS is sufficient. > If we do want the stream-specific detail, then I propose > * caching the context in the constructor > * updating it only in close() or unbuffer() (as we do from S3AInputStream to > S3AInstrumenation) > * excluding those we know the FS already collects. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org