[
https://issues.apache.org/jira/browse/HADOOP-17461?focusedWorklogId=793658&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-793658
]
ASF GitHub Bot logged work on HADOOP-17461:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 21/Jul/22 10:12
Start Date: 21/Jul/22 10:12
Worklog Time Spent: 10m
Work Description: steveloughran commented on code in PR #4352:
URL: https://github.com/apache/hadoop/pull/4352#discussion_r926485594
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextIntegration.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics.impl;
+
+import java.lang.ref.WeakReference;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.impl.WeakReferenceThreadMap;
+import org.apache.hadoop.fs.statistics.IOStatisticsContext;
+
+import static
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED;
+import static
org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT;
+
+/**
+ * A Utility class for IOStatisticsContext, which helps in creating and
+ * getting the current active context. Static methods in this class allows to
+ * get the current context to start aggregating the IOStatistics.
+ *
+ * Static initializer is used to work out if the feature to collect
+ * thread-level IOStatistics is enabled or not and the corresponding
+ * implementation class is called for it.
+ *
+ * Weak Reference thread map to be used to keep track of different context's
+ * to avoid long-lived memory leakages as these references would be cleaned
+ * up at GC.
+ */
+public final class IOStatisticsContextIntegration {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(IOStatisticsContextIntegration.class);
+
+ /**
+ * Is thread-level IO Statistics enabled?
+ */
+ private static final boolean IS_THREAD_IOSTATS_ENABLED;
+
+ /**
+ * ID for next instance to create.
+ */
+ public static final AtomicLong INSTANCE_ID = new AtomicLong(1);
+
+ /**
+ * Active IOStatistics Context containing different worker thread's
+ * statistics. Weak Reference so that it gets cleaned up during GC and we
+ * avoid any memory leak issues due to long lived references.
+ */
+ private static final WeakReferenceThreadMap<IOStatisticsContext>
+ ACTIVE_IOSTATS_CONTEXT =
+ new WeakReferenceThreadMap<>(
+ IOStatisticsContextIntegration::createNewInstance,
+ IOStatisticsContextIntegration::referenceLostContext
+ );
+
+ static {
+ // Work out if the current context has thread level IOStatistics enabled.
+ final Configuration configuration = new Configuration();
+ IS_THREAD_IOSTATS_ENABLED =
+ configuration.getBoolean(THREAD_LEVEL_IOSTATISTICS_ENABLED,
+ THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT);
+ }
+
+ /**
+ * Private constructor for a utility class to be used in IOStatisticsContext.
+ */
+ private IOStatisticsContextIntegration() {}
+
+ /**
+ * Creating a new IOStatisticsContext instance for a FS to be used.
+ * @param key Thread ID that represents which thread the context belongs to.
+ * @return an instance of IOStatisticsContext.
+ */
+ private static IOStatisticsContext createNewInstance(Long key) {
+ return new IOStatisticsContextImpl(key, INSTANCE_ID.getAndIncrement());
+ }
+
+ /**
+ * In case of reference loss for IOStatisticsContext.
+ * @param key ThreadID.
+ */
+ private static void referenceLostContext(Long key) {
+ LOG.debug("Reference lost for threadID for the context: {}", key);
+ }
+
+ /**
+ * Get the current thread's IOStatisticsContext instance. If no instance is
+ * present for this thread ID, create one using the factory.
+ * @return instance of IOStatisticsContext.
+ */
+ public static IOStatisticsContext getCurrentIOStatisticsContext() {
+ return IS_THREAD_IOSTATS_ENABLED
+ ? ACTIVE_IOSTATS_CONTEXT.getForCurrentThread()
+ : EmptyIOStatisticsContextImpl.getInstance();
+ }
+
+ /**
+ * Set the IOStatisticsContext for the current thread.
+ * @param statisticsContext IOStatistics context instance for the
+ * current thread.
+ */
+ public static void setThreadIOStatisticsContext(
+ IOStatisticsContext statisticsContext) {
+ if (IS_THREAD_IOSTATS_ENABLED &&
+ ACTIVE_IOSTATS_CONTEXT.getForCurrentThread() != statisticsContext) {
+ ACTIVE_IOSTATS_CONTEXT.setForCurrentThread(statisticsContext);
Review Comment:
if statisticsContext is null, call remove() instead; lets us use it as the
resetter
Issue Time Tracking
-------------------
Worklog Id: (was: 793658)
Time Spent: 7h 10m (was: 7h)
> Add thread-level IOStatistics Context
> -------------------------------------
>
> Key: HADOOP-17461
> URL: https://issues.apache.org/jira/browse/HADOOP-17461
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs, fs/azure, fs/s3
> Affects Versions: 3.3.1
> Reporter: Steve Loughran
> Assignee: Mehakmeet Singh
> Priority: Major
> Labels: pull-request-available
> Time Spent: 7h 10m
> Remaining Estimate: 0h
>
> For effective reporting of the iostatistics of individual worker threads, we
> need a thread-level context which IO components update.
> * this contact needs to be passed in two background thread forming work on
> behalf of a task.
> * IO Components (streams, iterators, filesystems) need to update this context
> statistics as they perform work
> * Without double counting anything.
> I imagine a ThreadLocal IOStatisticContext which will be updated in the
> FileSystem API Calls. This context MUST be passed into the background threads
> used by a task, so that IO is correctly aggregated.
> I don't want streams, listIterators &c to do the updating as there is more
> risk of double counting. However, we need to see their statistics if we want
> to know things like "bytes discarded in backwards seeks". And I don't want to
> be updating a shared context object on every read() call.
> If all we want is store IO (HEAD, GET, DELETE, list performance etc) then the
> FS is sufficient.
> If we do want the stream-specific detail, then I propose
> * caching the context in the constructor
> * updating it only in close() or unbuffer() (as we do from S3AInputStream to
> S3AInstrumenation)
> * excluding those we know the FS already collects.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]