[
https://issues.apache.org/jira/browse/HADOOP-18457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17627697#comment-17627697
]
ASF GitHub Bot commented on HADOOP-18457:
-----------------------------------------
pranavsaxena-microsoft commented on code in PR #5034:
URL: https://github.com/apache/hadoop/pull/5034#discussion_r1011670326
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java:
##########
@@ -95,6 +95,18 @@ private AbfsClientThrottlingAnalyzer() {
analysisPeriodMs);
}
+ /**
+ * Resumes the timer if it was stopped.
+ */
+ public void resumeTimer() {
Review Comment:
Lets have it private
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java:
##########
@@ -216,6 +218,10 @@ SharedKeyCredentials getSharedKeyCredentials() {
return sharedKeyCredentials;
}
+ public AbfsThrottlingIntercept getIntercept() {
Review Comment:
Lets make it 'default' access-type. Reason being, this can be accessed by
end-developer and might updateMetrics in such a way that our client doesn't
throttle in suitable manner.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java:
##########
@@ -104,19 +113,24 @@ private AbfsClientThrottlingAnalyzer() {
public void addBytesTransferred(long count, boolean isFailedOperation) {
AbfsOperationMetrics metrics = blobMetrics.get();
if (isFailedOperation) {
- metrics.bytesFailed.addAndGet(count);
- metrics.operationsFailed.incrementAndGet();
+ metrics.getBytesFailed().addAndGet(count);
+ metrics.getOperationsFailed().incrementAndGet();
} else {
- metrics.bytesSuccessful.addAndGet(count);
- metrics.operationsSuccessful.incrementAndGet();
+ metrics.getBytesSuccessful().addAndGet(count);
+ metrics.getOperationsSuccessful().incrementAndGet();
}
+ blobMetrics.set(metrics);
}
/**
* Suspends the current storage operation, as necessary, to reduce
throughput.
* @return true if Thread sleeps(Throttling occurs) else false.
*/
public boolean suspendIfNecessary() {
+ if (isOperationOnAccountIdle.get()) {
+ resumeTimer();
+ }
Review Comment:
Still better approach I feel we can implement is (I just thought of it now),
lets have a synchronized block which deals with isOperationOnAccountIdle
(either for suspend or resume), because still race conditions can happen. For
ex:,
1.
https://github.com/anmolanmol1234/hadoop/blob/HADOOP-18457_temp/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java#L266
2.
https://github.com/anmolanmol1234/hadoop/blob/HADOOP-18457_temp/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java#L133-L136
3.
https://github.com/anmolanmol1234/hadoop/blob/HADOOP-18457_temp/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java#L267-L269
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java:
##########
@@ -38,35 +40,89 @@
* and sleeps just enough to minimize errors, allowing optimal ingress and/or
* egress throughput.
*/
-public final class AbfsClientThrottlingIntercept {
+public final class AbfsClientThrottlingIntercept implements
AbfsThrottlingIntercept {
private static final Logger LOG = LoggerFactory.getLogger(
AbfsClientThrottlingIntercept.class);
private static final String RANGE_PREFIX = "bytes=";
- private static AbfsClientThrottlingIntercept singleton = null;
- private AbfsClientThrottlingAnalyzer readThrottler = null;
- private AbfsClientThrottlingAnalyzer writeThrottler = null;
- private static boolean isAutoThrottlingEnabled = false;
+ private static AbfsClientThrottlingIntercept singleton; // singleton,
initialized in static initialization block
+ private static final ReentrantLock LOCK = new ReentrantLock();
+ private final AbfsClientThrottlingAnalyzer readThrottler;
+ private final AbfsClientThrottlingAnalyzer writeThrottler;
+ private final String accountName;
// Hide default constructor
- private AbfsClientThrottlingIntercept() {
- readThrottler = new AbfsClientThrottlingAnalyzer("read");
- writeThrottler = new AbfsClientThrottlingAnalyzer("write");
+ public AbfsClientThrottlingIntercept(String accountName, AbfsConfiguration
abfsConfiguration) {
+ this.accountName = accountName;
+ this.readThrottler = setAnalyzer("read " + accountName, abfsConfiguration);
+ this.writeThrottler = setAnalyzer("write " + accountName,
abfsConfiguration);
+ LOG.debug("Client-side throttling is enabled for the ABFS file system for
the account : {}", accountName);
}
- public static synchronized void initializeSingleton(boolean
enableAutoThrottling) {
- if (!enableAutoThrottling) {
- return;
- }
+ // Hide default constructor
+ private AbfsClientThrottlingIntercept(AbfsConfiguration abfsConfiguration) {
+ //Account name is kept as empty as same instance is shared across all
accounts
+ this.accountName = "";
+ this.readThrottler = setAnalyzer("read", abfsConfiguration);
+ this.writeThrottler = setAnalyzer("write", abfsConfiguration);
+ LOG.debug("Client-side throttling is enabled for the ABFS file system
using singleton intercept");
+ }
+
+ /**
+ * Sets the analyzer for the intercept.
+ * @param name Name of the analyzer.
+ * @param abfsConfiguration The configuration.
+ * @return AbfsClientThrottlingAnalyzer instance.
+ */
+ private AbfsClientThrottlingAnalyzer setAnalyzer(String name,
AbfsConfiguration abfsConfiguration) {
+ return new AbfsClientThrottlingAnalyzer(name, abfsConfiguration);
+ }
+
+ /**
+ * Returns the analyzer for read operations.
+ * @return AbfsClientThrottlingAnalyzer for read.
+ */
+ AbfsClientThrottlingAnalyzer getReadThrottler() {
+ return readThrottler;
+ }
+
+ /**
+ * Returns the analyzer for write operations.
+ * @return AbfsClientThrottlingAnalyzer for write.
+ */
+ AbfsClientThrottlingAnalyzer getWriteThrottler() {
+ return writeThrottler;
+ }
+
+ /**
+ * Creates a singleton object of the AbfsClientThrottlingIntercept.
+ * which is shared across all filesystem instances.
+ * @param abfsConfiguration configuration set.
+ * @return singleton object of intercept.
+ */
+ static AbfsClientThrottlingIntercept initializeSingleton(AbfsConfiguration
abfsConfiguration) {
if (singleton == null) {
- singleton = new AbfsClientThrottlingIntercept();
- isAutoThrottlingEnabled = true;
- LOG.debug("Client-side throttling is enabled for the ABFS file system.");
+ LOCK.lock();
+ try {
+ if (singleton == null) {
+ singleton = new AbfsClientThrottlingIntercept(abfsConfiguration);
+ LOG.debug("Client-side throttling is enabled for the ABFS file
system.");
+ }
+ } finally {
+ LOCK.unlock();
+ }
}
+ return singleton;
}
- static void updateMetrics(AbfsRestOperationType operationType,
- AbfsHttpOperation abfsHttpOperation) {
- if (!isAutoThrottlingEnabled || abfsHttpOperation == null) {
+ /**
+ * Updates the metrics for successful and failed read and write operations.
+ * @param operationType Only applicable for read and write operations.
+ * @param abfsHttpOperation Used for status code and data transfeered.
Review Comment:
nit: transferred.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java:
##########
@@ -38,35 +40,89 @@
* and sleeps just enough to minimize errors, allowing optimal ingress and/or
* egress throughput.
*/
-public final class AbfsClientThrottlingIntercept {
+public final class AbfsClientThrottlingIntercept implements
AbfsThrottlingIntercept {
private static final Logger LOG = LoggerFactory.getLogger(
AbfsClientThrottlingIntercept.class);
private static final String RANGE_PREFIX = "bytes=";
- private static AbfsClientThrottlingIntercept singleton = null;
- private AbfsClientThrottlingAnalyzer readThrottler = null;
- private AbfsClientThrottlingAnalyzer writeThrottler = null;
- private static boolean isAutoThrottlingEnabled = false;
+ private static AbfsClientThrottlingIntercept singleton; // singleton,
initialized in static initialization block
+ private static final ReentrantLock LOCK = new ReentrantLock();
+ private final AbfsClientThrottlingAnalyzer readThrottler;
+ private final AbfsClientThrottlingAnalyzer writeThrottler;
+ private final String accountName;
// Hide default constructor
- private AbfsClientThrottlingIntercept() {
- readThrottler = new AbfsClientThrottlingAnalyzer("read");
- writeThrottler = new AbfsClientThrottlingAnalyzer("write");
+ public AbfsClientThrottlingIntercept(String accountName, AbfsConfiguration
abfsConfiguration) {
+ this.accountName = accountName;
+ this.readThrottler = setAnalyzer("read " + accountName, abfsConfiguration);
+ this.writeThrottler = setAnalyzer("write " + accountName,
abfsConfiguration);
+ LOG.debug("Client-side throttling is enabled for the ABFS file system for
the account : {}", accountName);
}
- public static synchronized void initializeSingleton(boolean
enableAutoThrottling) {
- if (!enableAutoThrottling) {
- return;
- }
+ // Hide default constructor
+ private AbfsClientThrottlingIntercept(AbfsConfiguration abfsConfiguration) {
+ //Account name is kept as empty as same instance is shared across all
accounts
+ this.accountName = "";
+ this.readThrottler = setAnalyzer("read", abfsConfiguration);
+ this.writeThrottler = setAnalyzer("write", abfsConfiguration);
+ LOG.debug("Client-side throttling is enabled for the ABFS file system
using singleton intercept");
+ }
+
+ /**
+ * Sets the analyzer for the intercept.
+ * @param name Name of the analyzer.
+ * @param abfsConfiguration The configuration.
+ * @return AbfsClientThrottlingAnalyzer instance.
+ */
+ private AbfsClientThrottlingAnalyzer setAnalyzer(String name,
AbfsConfiguration abfsConfiguration) {
+ return new AbfsClientThrottlingAnalyzer(name, abfsConfiguration);
+ }
+
+ /**
+ * Returns the analyzer for read operations.
+ * @return AbfsClientThrottlingAnalyzer for read.
+ */
+ AbfsClientThrottlingAnalyzer getReadThrottler() {
+ return readThrottler;
+ }
+
+ /**
+ * Returns the analyzer for write operations.
+ * @return AbfsClientThrottlingAnalyzer for write.
+ */
+ AbfsClientThrottlingAnalyzer getWriteThrottler() {
+ return writeThrottler;
+ }
+
+ /**
+ * Creates a singleton object of the AbfsClientThrottlingIntercept.
+ * which is shared across all filesystem instances.
+ * @param abfsConfiguration configuration set.
+ * @return singleton object of intercept.
+ */
+ static AbfsClientThrottlingIntercept initializeSingleton(AbfsConfiguration
abfsConfiguration) {
if (singleton == null) {
- singleton = new AbfsClientThrottlingIntercept();
- isAutoThrottlingEnabled = true;
- LOG.debug("Client-side throttling is enabled for the ABFS file system.");
+ LOCK.lock();
+ try {
+ if (singleton == null) {
+ singleton = new AbfsClientThrottlingIntercept(abfsConfiguration);
+ LOG.debug("Client-side throttling is enabled for the ABFS file
system.");
+ }
+ } finally {
Review Comment:
We can remove try , finally block.
> ABFS: Support for account level throttling
> ------------------------------------------
>
> Key: HADOOP-18457
> URL: https://issues.apache.org/jira/browse/HADOOP-18457
> Project: Hadoop Common
> Issue Type: Sub-task
> Affects Versions: 3.3.4
> Reporter: Anmol Asrani
> Assignee: Anmol Asrani
> Priority: Major
> Labels: pull-request-available
> Fix For: 3.4.0
>
>
> To add support for throttling at account level
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]