This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new a89ed70dd45 Revert "HDDS-15287. Remove unused deferred/async RPC
(#10280)"
a89ed70dd45 is described below
commit a89ed70dd457c667e415baaa32709cc7f1ee7e7f
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Tue May 19 08:26:23 2026 +0200
Revert "HDDS-15287. Remove unused deferred/async RPC (#10280)"
Cause: compile error
This reverts commit 05e45f3d796bc4f61663c8155aa15686f96b0e78.
---
.../ipc_/AsyncCallLimitExceededException.java | 36 +++++++
.../main/java/org/apache/hadoop/ipc_/Client.java | 111 ++++++++++++++++++-
.../org/apache/hadoop/ipc_/ProtobufRpcEngine.java | 76 ++++++++++++-
.../main/java/org/apache/hadoop/ipc_/Server.java | 119 ++++++++++++++++++---
.../hadoop/ipc_/metrics/RpcDetailedMetrics.java | 6 ++
.../org/apache/hadoop/ipc_/metrics/RpcMetrics.java | 33 ++++++
6 files changed, 364 insertions(+), 17 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/AsyncCallLimitExceededException.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/AsyncCallLimitExceededException.java
new file mode 100644
index 00000000000..4050b68ff4f
--- /dev/null
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/AsyncCallLimitExceededException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc_;
+
+import java.io.IOException;
+
+/**
+ * Signals that an AsyncCallLimitExceededException has occurred. This class is
+ * used to make application code using async RPC aware that limit of max async
+ * calls is reached, application code need to retrieve results from response of
+ * established async calls to avoid buffer overflow in order for follow-on
async
+ * calls going correctly.
+ */
+public class AsyncCallLimitExceededException extends IOException {
+ private static final long serialVersionUID = 1L;
+
+ public AsyncCallLimitExceededException(String message) {
+ super(message);
+ }
+}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Client.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Client.java
index 6c6ad67ca34..f1a67df3305 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Client.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Client.java
@@ -85,6 +85,21 @@ public class Client implements AutoCloseable {
private static final ThreadLocal<Integer> retryCount = new
ThreadLocal<Integer>();
private static final ThreadLocal<Object> EXTERNAL_CALL_HANDLER
= new ThreadLocal<>();
+ private static final ThreadLocal<AsyncGet<? extends Writable, IOException>>
+ ASYNC_RPC_RESPONSE = new ThreadLocal<>();
+ private static final ThreadLocal<Boolean> asynchronousMode =
+ new ThreadLocal<Boolean>() {
+ @Override
+ protected Boolean initialValue() {
+ return false;
+ }
+ };
+
+ @SuppressWarnings("unchecked")
+ public static <T extends Writable> AsyncGet<T, IOException>
+ getAsyncRpcResponse() {
+ return (AsyncGet<T, IOException>) ASYNC_RPC_RESPONSE.get();
+ }
/**
* Set call id and retry count for the next call.
@@ -120,6 +135,8 @@ public static void setCallIdAndRetryCount(int cid, int rc,
private final boolean fallbackAllowed;
private final boolean bindToWildCardAddress;
private final byte[] clientId;
+ private final int maxAsyncCalls;
+ private final AtomicInteger asyncCallCounter = new AtomicInteger(0);
/**
* set the ping interval value in configuration
@@ -1281,6 +1298,9 @@ public Client(Class<? extends Writable> valueClass,
Configuration conf,
CommonConfigurationKeys.IPC_CLIENT_BIND_WILDCARD_ADDR_DEFAULT);
this.clientId = ClientId.getClientId();
+ this.maxAsyncCalls = conf.getInt(
+ CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
+ CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT);
}
/**
@@ -1369,6 +1389,28 @@ public Writable call(RPC.RpcKind rpcKind, Writable
rpcRequest,
fallbackToSimpleAuth, alignmentContext);
}
+ private void checkAsyncCall() throws IOException {
+ if (isAsynchronousMode()) {
+ if (asyncCallCounter.incrementAndGet() > maxAsyncCalls) {
+ asyncCallCounter.decrementAndGet();
+ String errMsg = String.format(
+ "Exceeded limit of max asynchronous calls: %d, " +
+ "please configure %s to adjust it.",
+ maxAsyncCalls,
+ CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY);
+ throw new AsyncCallLimitExceededException(errMsg);
+ }
+ }
+ }
+
+ Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
+ ConnectionId remoteId, int serviceClass,
+ AtomicBoolean fallbackToSimpleAuth)
+ throws IOException {
+ return call(rpcKind, rpcRequest, remoteId, serviceClass,
+ fallbackToSimpleAuth, null);
+ }
+
/**
* Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
* <code>remoteId</code>, returning the rpc response.
@@ -1394,6 +1436,7 @@ Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
fallbackToSimpleAuth);
try {
+ checkAsyncCall();
try {
connection.sendRpcRequest(call); // send the rpc
request
} catch (RejectedExecutionException e) {
@@ -1406,10 +1449,76 @@ Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
throw ioe;
}
} catch(Exception e) {
+ if (isAsynchronousMode()) {
+ releaseAsyncCall();
+ }
throw e;
}
- return getRpcResponse(call, connection, -1, null);
+ if (isAsynchronousMode()) {
+ final AsyncGet<Writable, IOException> asyncGet
+ = new AsyncGet<Writable, IOException>() {
+ @Override
+ public Writable get(long timeout, TimeUnit unit)
+ throws IOException, TimeoutException{
+ boolean done = true;
+ try {
+ final Writable w = getRpcResponse(call, connection, timeout, unit);
+ if (w == null) {
+ done = false;
+ throw new TimeoutException(call + " timed out "
+ + timeout + " " + unit);
+ }
+ return w;
+ } finally {
+ if (done) {
+ releaseAsyncCall();
+ }
+ }
+ }
+
+ @Override
+ public boolean isDone() {
+ synchronized (call) {
+ return call.done;
+ }
+ }
+ };
+
+ ASYNC_RPC_RESPONSE.set(asyncGet);
+ return null;
+ } else {
+ return getRpcResponse(call, connection, -1, null);
+ }
+ }
+
+ /**
+ * Check if RPC is in asynchronous mode or not.
+ *
+ * @return true, if RPC is in asynchronous mode, otherwise false for
+ * synchronous mode.
+ */
+ public static boolean isAsynchronousMode() {
+ return asynchronousMode.get();
+ }
+
+ /**
+ * Set RPC to asynchronous or synchronous mode.
+ *
+ * @param async
+ * true, RPC will be in asynchronous mode, otherwise false for
+ * synchronous mode
+ */
+ public static void setAsynchronousMode(boolean async) {
+ asynchronousMode.set(async);
+ }
+
+ private void releaseAsyncCall() {
+ asyncCallCounter.decrementAndGet();
+ }
+
+ int getAsyncCallCount() {
+ return asyncCallCounter.get();
}
/** @return the rpc response or, in case of timeout, null. */
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ProtobufRpcEngine.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ProtobufRpcEngine.java
index 0903eee7274..c2b7c4a01fc 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ProtobufRpcEngine.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ProtobufRpcEngine.java
@@ -30,6 +30,7 @@
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.concurrent.AsyncGet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +41,7 @@
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -48,6 +50,8 @@
public class ProtobufRpcEngine implements RpcEngine {
public static final Logger LOG =
LoggerFactory.getLogger(ProtobufRpcEngine.class);
+ private static final ThreadLocal<AsyncGet<Message, Exception>>
+ ASYNC_RETURN_MESSAGE = new ThreadLocal<>();
static { // Register the rpcRequest deserializer for ProtobufRpcEngine
org.apache.hadoop.ipc_.Server.registerProtocolEngine(
@@ -57,6 +61,10 @@ public class ProtobufRpcEngine implements RpcEngine {
private static final ClientCache CLIENTS = new ClientCache();
+ public static AsyncGet<Message, Exception> getAsyncReturnMessage() {
+ return ASYNC_RETURN_MESSAGE.get();
+ }
+
@Override
@SuppressWarnings("unchecked")
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
@@ -224,7 +232,26 @@ public Message invoke(Object proxy, final Method method,
Object[] args)
LOG.debug("Call: " + method.getName() + " took " + callTime + "ms");
}
- return getReturnMessage(method, val);
+ if (Client.isAsynchronousMode()) {
+ final AsyncGet<RpcWritable.Buffer, IOException> arr
+ = Client.getAsyncRpcResponse();
+ final AsyncGet<Message, Exception> asyncGet
+ = new AsyncGet<Message, Exception>() {
+ @Override
+ public Message get(long timeout, TimeUnit unit) throws Exception {
+ return getReturnMessage(method, arr.get(timeout, unit));
+ }
+
+ @Override
+ public boolean isDone() {
+ return arr.isDone();
+ }
+ };
+ ASYNC_RETURN_MESSAGE.set(asyncGet);
+ return null;
+ } else {
+ return getReturnMessage(method, val);
+ }
}
protected Writable constructRpcRequest(Method method, Message theRequest) {
@@ -314,6 +341,9 @@ public RPC.Server getServer(Class<?> protocol, Object
protocolImpl,
public static class Server extends RPC.Server {
+ static final ThreadLocal<ProtobufRpcEngineCallback> currentCallback =
+ new ThreadLocal<>();
+
static final ThreadLocal<CallInfo> currentCallInfo = new ThreadLocal<>();
static class CallInfo {
@@ -326,6 +356,43 @@ public CallInfo(RPC.Server server, String methodName) {
}
}
+ static class ProtobufRpcEngineCallbackImpl
+ implements ProtobufRpcEngineCallback {
+
+ private final RPC.Server server;
+ private final Call call;
+ private final String methodName;
+ private final long setupTime;
+
+ public ProtobufRpcEngineCallbackImpl() {
+ this.server = currentCallInfo.get().server;
+ this.call = Server.getCurCall().get();
+ this.methodName = currentCallInfo.get().methodName;
+ this.setupTime = Time.now();
+ }
+
+ @Override
+ public void setResponse(Message message) {
+ long processingTime = Time.now() - setupTime;
+ call.setDeferredResponse(RpcWritable.wrap(message));
+ server.updateDeferredMetrics(methodName, processingTime);
+ }
+
+ @Override
+ public void error(Throwable t) {
+ long processingTime = Time.now() - setupTime;
+ String detailedMetricsName = t.getClass().getSimpleName();
+ server.updateDeferredMetrics(detailedMetricsName, processingTime);
+ call.setDeferredError(t);
+ }
+ }
+
+ public static ProtobufRpcEngineCallback registerForDeferredResponse() {
+ ProtobufRpcEngineCallback callback = new ProtobufRpcEngineCallbackImpl();
+ currentCallback.set(callback);
+ return callback;
+ }
+
/**
* Construct an RPC server.
*
@@ -459,6 +526,13 @@ protected Writable call(RPC.Server server, String
connectionProtocolName,
currentCallInfo.set(new CallInfo(server, methodName));
currentCall.setDetailedMetricsName(methodName);
result = service.callBlockingMethod(methodDescriptor, null, param);
+ // Check if this needs to be a deferred response,
+ // by checking the ThreadLocal callback being set
+ if (currentCallback.get() != null) {
+ currentCall.deferResponse();
+ currentCallback.set(null);
+ return null;
+ }
} catch (ServiceException e) {
Exception exception = (Exception) e.getCause();
currentCall.setDetailedMetricsName(
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Server.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Server.java
index 51543bca9f1..aabe01b1762 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Server.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Server.java
@@ -567,7 +567,7 @@ void updateMetrics(Call call, long startTime, boolean
connDropped) {
long queueTime = details.get(Timing.QUEUE, RpcMetrics.TIMEUNIT);
rpcMetrics.addRpcQueueTime(queueTime);
- if (connDropped) {
+ if (call.isResponseDeferred() || connDropped) {
// call was skipped; don't include it in processing metrics
return;
}
@@ -588,6 +588,11 @@ void updateMetrics(Call call, long startTime, boolean
connDropped) {
}
}
+ void updateDeferredMetrics(String name, long processingTime) {
+ rpcMetrics.addDeferredRpcProcessingTime(processingTime);
+ rpcDetailedMetrics.addDeferredProcessingTime(name, processingTime);
+ }
+
/**
* A convenience method to bind to a given address and report
* better exceptions if the address is not a valid host.
@@ -758,6 +763,7 @@ public static class Call implements Schedulable,
final RPC.RpcKind rpcKind;
final byte[] clientId;
private final CallerContext callerContext; // the call context
+ private boolean deferredResponse = false;
private int priorityLevel;
// the priority level assigned by scheduler, 0 by default
private long clientStateId;
@@ -910,6 +916,20 @@ public void markCallCoordinated(boolean flag) {
public boolean isCallCoordinated() {
return this.isCallCoordinated;
}
+
+ public void deferResponse() {
+ this.deferredResponse = true;
+ }
+
+ public boolean isResponseDeferred() {
+ return this.deferredResponse;
+ }
+
+ public void setDeferredResponse(Writable response) {
+ }
+
+ public void setDeferredError(Throwable t) {
+ }
}
/** A RPC extended call queued for handling. */
@@ -990,21 +1010,27 @@ public Void run() throws Exception {
} catch (Throwable e) {
populateResponseParamsOnError(e, responseParams);
}
- long deltaNanos = Time.monotonicNowNanos() - startNanos;
- ProcessingDetails details = getProcessingDetails();
+ if (!isResponseDeferred()) {
+ long deltaNanos = Time.monotonicNowNanos() - startNanos;
+ ProcessingDetails details = getProcessingDetails();
- details.set(Timing.PROCESSING, deltaNanos, TimeUnit.NANOSECONDS);
- deltaNanos -= details.get(Timing.LOCKWAIT, TimeUnit.NANOSECONDS);
- deltaNanos -= details.get(Timing.LOCKSHARED, TimeUnit.NANOSECONDS);
- deltaNanos -= details.get(Timing.LOCKEXCLUSIVE, TimeUnit.NANOSECONDS);
- details.set(Timing.LOCKFREE, deltaNanos, TimeUnit.NANOSECONDS);
- startNanos = Time.monotonicNowNanos();
+ details.set(Timing.PROCESSING, deltaNanos, TimeUnit.NANOSECONDS);
+ deltaNanos -= details.get(Timing.LOCKWAIT, TimeUnit.NANOSECONDS);
+ deltaNanos -= details.get(Timing.LOCKSHARED, TimeUnit.NANOSECONDS);
+ deltaNanos -= details.get(Timing.LOCKEXCLUSIVE, TimeUnit.NANOSECONDS);
+ details.set(Timing.LOCKFREE, deltaNanos, TimeUnit.NANOSECONDS);
+ startNanos = Time.monotonicNowNanos();
- setResponseFields(value, responseParams);
- sendResponse();
+ setResponseFields(value, responseParams);
+ sendResponse();
- deltaNanos = Time.monotonicNowNanos() - startNanos;
- details.set(Timing.RESPONSE, deltaNanos, TimeUnit.NANOSECONDS);
+ deltaNanos = Time.monotonicNowNanos() - startNanos;
+ details.set(Timing.RESPONSE, deltaNanos, TimeUnit.NANOSECONDS);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Deferring response for callId: " + this.callId);
+ }
+ }
return null;
}
@@ -1064,6 +1090,69 @@ void doResponse(Throwable t, RpcStatusProto status)
throws IOException {
connection.sendResponse(call);
}
+ /**
+ * Send a deferred response, ignoring errors.
+ */
+ private void sendDeferedResponse() {
+ try {
+ connection.sendResponse(this);
+ } catch (Exception e) {
+ // For synchronous calls, application code is done once it's returned
+ // from a method. It does not expect to receive an error.
+ // This is equivalent to what happens in synchronous calls when the
+ // Responder is not able to send out the response.
+ LOG.error("Failed to send deferred response. ThreadName=" + Thread
+ .currentThread().getName() + ", CallId="
+ + callId + ", hostname=" + getHostAddress());
+ }
+ }
+
+ @Override
+ public void setDeferredResponse(Writable response) {
+ if (this.connection.getServer().running) {
+ try {
+ setupResponse(this, RpcStatusProto.SUCCESS, null, response,
+ null, null);
+ } catch (IOException e) {
+ // For synchronous calls, application code is done once it has
+ // returned from a method. It does not expect to receive an error.
+ // This is equivalent to what happens in synchronous calls when the
+ // response cannot be sent.
+ LOG.error(
+ "Failed to setup deferred successful response. ThreadName=" +
+ Thread.currentThread().getName() + ", Call=" + this);
+ return;
+ }
+ sendDeferedResponse();
+ }
+ }
+
+ @Override
+ public void setDeferredError(Throwable t) {
+ if (this.connection.getServer().running) {
+ if (t == null) {
+ t = new IOException(
+ "User code indicated an error without an exception");
+ }
+ try {
+ ResponseParams responseParams = new ResponseParams();
+ populateResponseParamsOnError(t, responseParams);
+ setupResponse(this, responseParams.returnStatus,
+ responseParams.detailedErr,
+ null, responseParams.errorClass, responseParams.error);
+ } catch (IOException e) {
+ // For synchronous calls, application code is done once it has
+ // returned from a method. It does not expect to receive an error.
+ // This is equivalent to what happens in synchronous calls when the
+ // response cannot be sent.
+ LOG.error(
+ "Failed to setup deferred error response. ThreadName=" +
+ Thread.currentThread().getName() + ", Call=" + this);
+ }
+ sendDeferedResponse();
+ }
+ }
+
/**
* Holds response parameters. Defaults set to work for successful
* invocations
@@ -2885,8 +2974,8 @@ public void run() {
if (call != null) {
updateMetrics(call, startTimeNanos, connDropped);
ProcessingDetails.LOG.debug(
- "Served: [{}] name={} user={} details={}",
- call,
+ "Served: [{}]{} name={} user={} details={}",
+ call, (call.isResponseDeferred() ? ", deferred" : ""),
call.getDetailedMetricsName(), call.getRemoteUser(),
call.getProcessingDetails());
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/metrics/RpcDetailedMetrics.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/metrics/RpcDetailedMetrics.java
index fed64834756..ee9309f2128 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/metrics/RpcDetailedMetrics.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/metrics/RpcDetailedMetrics.java
@@ -33,6 +33,7 @@
public class RpcDetailedMetrics {
@Metric MutableRatesWithAggregation rates;
+ @Metric MutableRatesWithAggregation deferredRpcRates;
static final Logger LOG = LoggerFactory.getLogger(RpcDetailedMetrics.class);
final MetricsRegistry registry;
@@ -58,6 +59,7 @@ public static RpcDetailedMetrics create(int port) {
*/
public void init(Class<?> protocol) {
rates.init(protocol);
+ deferredRpcRates.init(protocol);
}
/**
@@ -70,6 +72,10 @@ public void addProcessingTime(String rpcCallName, long
processingTime) {
rates.add(rpcCallName, processingTime);
}
+ public void addDeferredProcessingTime(String name, long processingTime) {
+ deferredRpcRates.add(name, processingTime);
+ }
+
/**
* Shutdown the instrumentation for the process
*/
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/metrics/RpcMetrics.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/metrics/RpcMetrics.java
index b4726d04bbd..4e799837697 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/metrics/RpcMetrics.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/metrics/RpcMetrics.java
@@ -67,6 +67,8 @@ public class RpcMetrics {
new MutableQuantiles[intervals.length];
rpcProcessingTimeQuantiles =
new MutableQuantiles[intervals.length];
+ deferredRpcProcessingTimeQuantiles =
+ new MutableQuantiles[intervals.length];
for (int i = 0; i < intervals.length; i++) {
int interval = intervals[i];
rpcQueueTimeQuantiles[i] = registry.newQuantiles("rpcQueueTime"
@@ -80,6 +82,10 @@ public class RpcMetrics {
"rpcProcessingTime" + interval + "s",
"rpc processing time in " + TIMEUNIT, "ops",
"latency", interval);
+ deferredRpcProcessingTimeQuantiles[i] = registry.newQuantiles(
+ "deferredRpcProcessingTime" + interval + "s",
+ "deferred rpc processing time in " + TIMEUNIT, "ops",
+ "latency", interval);
}
}
LOG.debug("Initialized " + registry);
@@ -100,6 +106,8 @@ public static RpcMetrics create(Server server,
Configuration conf) {
MutableQuantiles[] rpcLockWaitTimeQuantiles;
@Metric("Processing time") MutableRate rpcProcessingTime;
MutableQuantiles[] rpcProcessingTimeQuantiles;
+ @Metric("Deferred Processing time") MutableRate deferredRpcProcessingTime;
+ MutableQuantiles[] deferredRpcProcessingTimeQuantiles;
@Metric("Number of authentication failures")
MutableCounterLong rpcAuthenticationFailures;
@Metric("Number of authentication successes")
@@ -228,6 +236,15 @@ public void addRpcProcessingTime(long processingTime) {
}
}
+ public void addDeferredRpcProcessingTime(long processingTime) {
+ deferredRpcProcessingTime.add(processingTime);
+ if (rpcQuantileEnable) {
+ for (MutableQuantiles q : deferredRpcProcessingTimeQuantiles) {
+ q.add(processingTime);
+ }
+ }
+ }
+
/**
* One client backoff event
*/
@@ -282,6 +299,22 @@ public long getRpcSlowCalls() {
return rpcSlowCalls.value();
}
+ public MutableRate getDeferredRpcProcessingTime() {
+ return deferredRpcProcessingTime;
+ }
+
+ public long getDeferredRpcProcessingSampleCount() {
+ return deferredRpcProcessingTime.lastStat().numSamples();
+ }
+
+ public double getDeferredRpcProcessingMean() {
+ return deferredRpcProcessingTime.lastStat().mean();
+ }
+
+ public double getDeferredRpcProcessingStdDev() {
+ return deferredRpcProcessingTime.lastStat().stddev();
+ }
+
public MetricsTag getTag(String tagName) {
return registry.getTag(tagName);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]