This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new a89ed70dd45 Revert "HDDS-15287. Remove unused deferred/async RPC 
(#10280)"
a89ed70dd45 is described below

commit a89ed70dd457c667e415baaa32709cc7f1ee7e7f
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Tue May 19 08:26:23 2026 +0200

    Revert "HDDS-15287. Remove unused deferred/async RPC (#10280)"
    
    Cause: compile error
    
    This reverts commit 05e45f3d796bc4f61663c8155aa15686f96b0e78.
---
 .../ipc_/AsyncCallLimitExceededException.java      |  36 +++++++
 .../main/java/org/apache/hadoop/ipc_/Client.java   | 111 ++++++++++++++++++-
 .../org/apache/hadoop/ipc_/ProtobufRpcEngine.java  |  76 ++++++++++++-
 .../main/java/org/apache/hadoop/ipc_/Server.java   | 119 ++++++++++++++++++---
 .../hadoop/ipc_/metrics/RpcDetailedMetrics.java    |   6 ++
 .../org/apache/hadoop/ipc_/metrics/RpcMetrics.java |  33 ++++++
 6 files changed, 364 insertions(+), 17 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/AsyncCallLimitExceededException.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/AsyncCallLimitExceededException.java
new file mode 100644
index 00000000000..4050b68ff4f
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/AsyncCallLimitExceededException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc_;
+
+import java.io.IOException;
+
+/**
+ * Signals that an AsyncCallLimitExceededException has occurred. This class is
+ * used to make application code using async RPC aware that limit of max async
+ * calls is reached, application code need to retrieve results from response of
+ * established async calls to avoid buffer overflow in order for follow-on 
async
+ * calls going correctly.
+ */
+public class AsyncCallLimitExceededException extends IOException {
+  private static final long serialVersionUID = 1L;
+
+  public AsyncCallLimitExceededException(String message) {
+    super(message);
+  }
+}
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Client.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Client.java
index 6c6ad67ca34..f1a67df3305 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Client.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Client.java
@@ -85,6 +85,21 @@ public class Client implements AutoCloseable {
   private static final ThreadLocal<Integer> retryCount = new 
ThreadLocal<Integer>();
   private static final ThreadLocal<Object> EXTERNAL_CALL_HANDLER
       = new ThreadLocal<>();
+  private static final ThreadLocal<AsyncGet<? extends Writable, IOException>>
+      ASYNC_RPC_RESPONSE = new ThreadLocal<>();
+  private static final ThreadLocal<Boolean> asynchronousMode =
+      new ThreadLocal<Boolean>() {
+        @Override
+        protected Boolean initialValue() {
+          return false;
+        }
+      };
+
+  @SuppressWarnings("unchecked")
+  public static <T extends Writable> AsyncGet<T, IOException>
+      getAsyncRpcResponse() {
+    return (AsyncGet<T, IOException>) ASYNC_RPC_RESPONSE.get();
+  }
 
   /**
    * Set call id and retry count for the next call.
@@ -120,6 +135,8 @@ public static void setCallIdAndRetryCount(int cid, int rc,
   private final boolean fallbackAllowed;
   private final boolean bindToWildCardAddress;
   private final byte[] clientId;
+  private final int maxAsyncCalls;
+  private final AtomicInteger asyncCallCounter = new AtomicInteger(0);
 
   /**
    * set the ping interval value in configuration
@@ -1281,6 +1298,9 @@ public Client(Class<? extends Writable> valueClass, 
Configuration conf,
             CommonConfigurationKeys.IPC_CLIENT_BIND_WILDCARD_ADDR_DEFAULT);
 
     this.clientId = ClientId.getClientId();
+    this.maxAsyncCalls = conf.getInt(
+        CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
+        CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT);
   }
 
   /**
@@ -1369,6 +1389,28 @@ public Writable call(RPC.RpcKind rpcKind, Writable 
rpcRequest,
         fallbackToSimpleAuth, alignmentContext);
   }
 
+  private void checkAsyncCall() throws IOException {
+    if (isAsynchronousMode()) {
+      if (asyncCallCounter.incrementAndGet() > maxAsyncCalls) {
+        asyncCallCounter.decrementAndGet();
+        String errMsg = String.format(
+            "Exceeded limit of max asynchronous calls: %d, " +
+            "please configure %s to adjust it.",
+            maxAsyncCalls,
+            CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY);
+        throw new AsyncCallLimitExceededException(errMsg);
+      }
+    }
+  }
+
+  Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
+                ConnectionId remoteId, int serviceClass,
+                AtomicBoolean fallbackToSimpleAuth)
+      throws IOException {
+    return call(rpcKind, rpcRequest, remoteId, serviceClass,
+        fallbackToSimpleAuth, null);
+  }
+
   /**
    * Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
    * <code>remoteId</code>, returning the rpc response.
@@ -1394,6 +1436,7 @@ Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
         fallbackToSimpleAuth);
 
     try {
+      checkAsyncCall();
       try {
         connection.sendRpcRequest(call);                 // send the rpc 
request
       } catch (RejectedExecutionException e) {
@@ -1406,10 +1449,76 @@ Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
         throw ioe;
       }
     } catch(Exception e) {
+      if (isAsynchronousMode()) {
+        releaseAsyncCall();
+      }
       throw e;
     }
 
-    return getRpcResponse(call, connection, -1, null);
+    if (isAsynchronousMode()) {
+      final AsyncGet<Writable, IOException> asyncGet
+          = new AsyncGet<Writable, IOException>() {
+        @Override
+        public Writable get(long timeout, TimeUnit unit)
+            throws IOException, TimeoutException{
+          boolean done = true;
+          try {
+            final Writable w = getRpcResponse(call, connection, timeout, unit);
+            if (w == null) {
+              done = false;
+              throw new TimeoutException(call + " timed out "
+                  + timeout + " " + unit);
+            }
+            return w;
+          } finally {
+            if (done) {
+              releaseAsyncCall();
+            }
+          }
+        }
+
+        @Override
+        public boolean isDone() {
+          synchronized (call) {
+            return call.done;
+          }
+        }
+      };
+
+      ASYNC_RPC_RESPONSE.set(asyncGet);
+      return null;
+    } else {
+      return getRpcResponse(call, connection, -1, null);
+    }
+  }
+
+  /**
+   * Check if RPC is in asynchronous mode or not.
+   *
+   * @return true, if RPC is in asynchronous mode, otherwise false for
+   *          synchronous mode.
+   */
+  public static boolean isAsynchronousMode() {
+    return asynchronousMode.get();
+  }
+
+  /**
+   * Set RPC to asynchronous or synchronous mode.
+   *
+   * @param async
+   *          true, RPC will be in asynchronous mode, otherwise false for
+   *          synchronous mode
+   */
+  public static void setAsynchronousMode(boolean async) {
+    asynchronousMode.set(async);
+  }
+
+  private void releaseAsyncCall() {
+    asyncCallCounter.decrementAndGet();
+  }
+
+  int getAsyncCallCount() {
+    return asyncCallCounter.get();
   }
 
   /** @return the rpc response or, in case of timeout, null. */
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ProtobufRpcEngine.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ProtobufRpcEngine.java
index 0903eee7274..c2b7c4a01fc 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ProtobufRpcEngine.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ProtobufRpcEngine.java
@@ -30,6 +30,7 @@
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.concurrent.AsyncGet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,6 +41,7 @@
 import java.net.InetSocketAddress;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -48,6 +50,8 @@
 public class ProtobufRpcEngine implements RpcEngine {
   public static final Logger LOG =
       LoggerFactory.getLogger(ProtobufRpcEngine.class);
+  private static final ThreadLocal<AsyncGet<Message, Exception>>
+      ASYNC_RETURN_MESSAGE = new ThreadLocal<>();
 
   static { // Register the rpcRequest deserializer for ProtobufRpcEngine
     org.apache.hadoop.ipc_.Server.registerProtocolEngine(
@@ -57,6 +61,10 @@ public class ProtobufRpcEngine implements RpcEngine {
 
   private static final ClientCache CLIENTS = new ClientCache();
 
+  public static AsyncGet<Message, Exception> getAsyncReturnMessage() {
+    return ASYNC_RETURN_MESSAGE.get();
+  }
+
   @Override
   @SuppressWarnings("unchecked")
   public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
@@ -224,7 +232,26 @@ public Message invoke(Object proxy, final Method method, 
Object[] args)
         LOG.debug("Call: " + method.getName() + " took " + callTime + "ms");
       }
       
-      return getReturnMessage(method, val);
+      if (Client.isAsynchronousMode()) {
+        final AsyncGet<RpcWritable.Buffer, IOException> arr
+            = Client.getAsyncRpcResponse();
+        final AsyncGet<Message, Exception> asyncGet
+            = new AsyncGet<Message, Exception>() {
+          @Override
+          public Message get(long timeout, TimeUnit unit) throws Exception {
+            return getReturnMessage(method, arr.get(timeout, unit));
+          }
+
+          @Override
+          public boolean isDone() {
+            return arr.isDone();
+          }
+        };
+        ASYNC_RETURN_MESSAGE.set(asyncGet);
+        return null;
+      } else {
+        return getReturnMessage(method, val);
+      }
     }
 
     protected Writable constructRpcRequest(Method method, Message theRequest) {
@@ -314,6 +341,9 @@ public RPC.Server getServer(Class<?> protocol, Object 
protocolImpl,
   
   public static class Server extends RPC.Server {
 
+    static final ThreadLocal<ProtobufRpcEngineCallback> currentCallback =
+        new ThreadLocal<>();
+
     static final ThreadLocal<CallInfo> currentCallInfo = new ThreadLocal<>();
 
     static class CallInfo {
@@ -326,6 +356,43 @@ public CallInfo(RPC.Server server, String methodName) {
       }
     }
 
+    static class ProtobufRpcEngineCallbackImpl
+        implements ProtobufRpcEngineCallback {
+
+      private final RPC.Server server;
+      private final Call call;
+      private final String methodName;
+      private final long setupTime;
+
+      public ProtobufRpcEngineCallbackImpl() {
+        this.server = currentCallInfo.get().server;
+        this.call = Server.getCurCall().get();
+        this.methodName = currentCallInfo.get().methodName;
+        this.setupTime = Time.now();
+      }
+
+      @Override
+      public void setResponse(Message message) {
+        long processingTime = Time.now() - setupTime;
+        call.setDeferredResponse(RpcWritable.wrap(message));
+        server.updateDeferredMetrics(methodName, processingTime);
+      }
+
+      @Override
+      public void error(Throwable t) {
+        long processingTime = Time.now() - setupTime;
+        String detailedMetricsName = t.getClass().getSimpleName();
+        server.updateDeferredMetrics(detailedMetricsName, processingTime);
+        call.setDeferredError(t);
+      }
+    }
+
+    public static ProtobufRpcEngineCallback registerForDeferredResponse() {
+      ProtobufRpcEngineCallback callback = new ProtobufRpcEngineCallbackImpl();
+      currentCallback.set(callback);
+      return callback;
+    }
+
     /**
      * Construct an RPC server.
      * 
@@ -459,6 +526,13 @@ protected Writable call(RPC.Server server, String 
connectionProtocolName,
           currentCallInfo.set(new CallInfo(server, methodName));
           currentCall.setDetailedMetricsName(methodName);
           result = service.callBlockingMethod(methodDescriptor, null, param);
+          // Check if this needs to be a deferred response,
+          // by checking the ThreadLocal callback being set
+          if (currentCallback.get() != null) {
+            currentCall.deferResponse();
+            currentCallback.set(null);
+            return null;
+          }
         } catch (ServiceException e) {
           Exception exception = (Exception) e.getCause();
           currentCall.setDetailedMetricsName(
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Server.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Server.java
index 51543bca9f1..aabe01b1762 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Server.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Server.java
@@ -567,7 +567,7 @@ void updateMetrics(Call call, long startTime, boolean 
connDropped) {
     long queueTime = details.get(Timing.QUEUE, RpcMetrics.TIMEUNIT);
     rpcMetrics.addRpcQueueTime(queueTime);
 
-    if (connDropped) {
+    if (call.isResponseDeferred() || connDropped) {
       // call was skipped; don't include it in processing metrics
       return;
     }
@@ -588,6 +588,11 @@ void updateMetrics(Call call, long startTime, boolean 
connDropped) {
     }
   }
 
+  void updateDeferredMetrics(String name, long processingTime) {
+    rpcMetrics.addDeferredRpcProcessingTime(processingTime);
+    rpcDetailedMetrics.addDeferredProcessingTime(name, processingTime);
+  }
+
   /**
    * A convenience method to bind to a given address and report 
    * better exceptions if the address is not a valid host.
@@ -758,6 +763,7 @@ public static class Call implements Schedulable,
     final RPC.RpcKind rpcKind;
     final byte[] clientId;
     private final CallerContext callerContext; // the call context
+    private boolean deferredResponse = false;
     private int priorityLevel;
     // the priority level assigned by scheduler, 0 by default
     private long clientStateId;
@@ -910,6 +916,20 @@ public void markCallCoordinated(boolean flag) {
     public boolean isCallCoordinated() {
       return this.isCallCoordinated;
     }
+
+    public void deferResponse() {
+      this.deferredResponse = true;
+    }
+
+    public boolean isResponseDeferred() {
+      return this.deferredResponse;
+    }
+
+    public void setDeferredResponse(Writable response) {
+    }
+
+    public void setDeferredError(Throwable t) {
+    }
   }
 
   /** A RPC extended call queued for handling. */
@@ -990,21 +1010,27 @@ public Void run() throws Exception {
       } catch (Throwable e) {
         populateResponseParamsOnError(e, responseParams);
       }
-      long deltaNanos = Time.monotonicNowNanos() - startNanos;
-      ProcessingDetails details = getProcessingDetails();
+      if (!isResponseDeferred()) {
+        long deltaNanos = Time.monotonicNowNanos() - startNanos;
+        ProcessingDetails details = getProcessingDetails();
 
-      details.set(Timing.PROCESSING, deltaNanos, TimeUnit.NANOSECONDS);
-      deltaNanos -= details.get(Timing.LOCKWAIT, TimeUnit.NANOSECONDS);
-      deltaNanos -= details.get(Timing.LOCKSHARED, TimeUnit.NANOSECONDS);
-      deltaNanos -= details.get(Timing.LOCKEXCLUSIVE, TimeUnit.NANOSECONDS);
-      details.set(Timing.LOCKFREE, deltaNanos, TimeUnit.NANOSECONDS);
-      startNanos = Time.monotonicNowNanos();
+        details.set(Timing.PROCESSING, deltaNanos, TimeUnit.NANOSECONDS);
+        deltaNanos -= details.get(Timing.LOCKWAIT, TimeUnit.NANOSECONDS);
+        deltaNanos -= details.get(Timing.LOCKSHARED, TimeUnit.NANOSECONDS);
+        deltaNanos -= details.get(Timing.LOCKEXCLUSIVE, TimeUnit.NANOSECONDS);
+        details.set(Timing.LOCKFREE, deltaNanos, TimeUnit.NANOSECONDS);
+        startNanos = Time.monotonicNowNanos();
 
-      setResponseFields(value, responseParams);
-      sendResponse();
+        setResponseFields(value, responseParams);
+        sendResponse();
 
-      deltaNanos = Time.monotonicNowNanos() - startNanos;
-      details.set(Timing.RESPONSE, deltaNanos, TimeUnit.NANOSECONDS);
+        deltaNanos = Time.monotonicNowNanos() - startNanos;
+        details.set(Timing.RESPONSE, deltaNanos, TimeUnit.NANOSECONDS);
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Deferring response for callId: " + this.callId);
+        }
+      }
       return null;
     }
 
@@ -1064,6 +1090,69 @@ void doResponse(Throwable t, RpcStatusProto status) 
throws IOException {
       connection.sendResponse(call);
     }
 
+    /**
+     * Send a deferred response, ignoring errors.
+     */
+    private void sendDeferedResponse() {
+      try {
+        connection.sendResponse(this);
+      } catch (Exception e) {
+        // For synchronous calls, application code is done once it's returned
+        // from a method. It does not expect to receive an error.
+        // This is equivalent to what happens in synchronous calls when the
+        // Responder is not able to send out the response.
+        LOG.error("Failed to send deferred response. ThreadName=" + Thread
+            .currentThread().getName() + ", CallId="
+            + callId + ", hostname=" + getHostAddress());
+      }
+    }
+
+    @Override
+    public void setDeferredResponse(Writable response) {
+      if (this.connection.getServer().running) {
+        try {
+          setupResponse(this, RpcStatusProto.SUCCESS, null, response,
+              null, null);
+        } catch (IOException e) {
+          // For synchronous calls, application code is done once it has
+          // returned from a method. It does not expect to receive an error.
+          // This is equivalent to what happens in synchronous calls when the
+          // response cannot be sent.
+          LOG.error(
+              "Failed to setup deferred successful response. ThreadName=" +
+                  Thread.currentThread().getName() + ", Call=" + this);
+          return;
+        }
+        sendDeferedResponse();
+      }
+    }
+
+    @Override
+    public void setDeferredError(Throwable t) {
+      if (this.connection.getServer().running) {
+        if (t == null) {
+          t = new IOException(
+              "User code indicated an error without an exception");
+        }
+        try {
+          ResponseParams responseParams = new ResponseParams();
+          populateResponseParamsOnError(t, responseParams);
+          setupResponse(this, responseParams.returnStatus,
+              responseParams.detailedErr,
+              null, responseParams.errorClass, responseParams.error);
+        } catch (IOException e) {
+          // For synchronous calls, application code is done once it has
+          // returned from a method. It does not expect to receive an error.
+          // This is equivalent to what happens in synchronous calls when the
+          // response cannot be sent.
+          LOG.error(
+              "Failed to setup deferred error response. ThreadName=" +
+                  Thread.currentThread().getName() + ", Call=" + this);
+        }
+        sendDeferedResponse();
+      }
+    }
+
     /**
      * Holds response parameters. Defaults set to work for successful
      * invocations
@@ -2885,8 +2974,8 @@ public void run() {
           if (call != null) {
             updateMetrics(call, startTimeNanos, connDropped);
             ProcessingDetails.LOG.debug(
-                "Served: [{}] name={} user={} details={}",
-                call,
+                "Served: [{}]{} name={} user={} details={}",
+                call, (call.isResponseDeferred() ? ", deferred" : ""),
                 call.getDetailedMetricsName(), call.getRemoteUser(),
                 call.getProcessingDetails());
           }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/metrics/RpcDetailedMetrics.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/metrics/RpcDetailedMetrics.java
index fed64834756..ee9309f2128 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/metrics/RpcDetailedMetrics.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/metrics/RpcDetailedMetrics.java
@@ -33,6 +33,7 @@
 public class RpcDetailedMetrics {
 
   @Metric MutableRatesWithAggregation rates;
+  @Metric MutableRatesWithAggregation deferredRpcRates;
 
   static final Logger LOG = LoggerFactory.getLogger(RpcDetailedMetrics.class);
   final MetricsRegistry registry;
@@ -58,6 +59,7 @@ public static RpcDetailedMetrics create(int port) {
    */
   public void init(Class<?> protocol) {
     rates.init(protocol);
+    deferredRpcRates.init(protocol);
   }
 
   /**
@@ -70,6 +72,10 @@ public void addProcessingTime(String rpcCallName, long 
processingTime) {
     rates.add(rpcCallName, processingTime);
   }
 
+  public void addDeferredProcessingTime(String name, long processingTime) {
+    deferredRpcRates.add(name, processingTime);
+  }
+
   /**
    * Shutdown the instrumentation for the process
    */
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/metrics/RpcMetrics.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/metrics/RpcMetrics.java
index b4726d04bbd..4e799837697 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/metrics/RpcMetrics.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/metrics/RpcMetrics.java
@@ -67,6 +67,8 @@ public class RpcMetrics {
           new MutableQuantiles[intervals.length];
       rpcProcessingTimeQuantiles =
           new MutableQuantiles[intervals.length];
+      deferredRpcProcessingTimeQuantiles =
+          new MutableQuantiles[intervals.length];
       for (int i = 0; i < intervals.length; i++) {
         int interval = intervals[i];
         rpcQueueTimeQuantiles[i] = registry.newQuantiles("rpcQueueTime"
@@ -80,6 +82,10 @@ public class RpcMetrics {
             "rpcProcessingTime" + interval + "s",
             "rpc processing time in " + TIMEUNIT, "ops",
             "latency", interval);
+        deferredRpcProcessingTimeQuantiles[i] = registry.newQuantiles(
+            "deferredRpcProcessingTime" + interval + "s",
+            "deferred rpc processing time in " + TIMEUNIT, "ops",
+            "latency", interval);
       }
     }
     LOG.debug("Initialized " + registry);
@@ -100,6 +106,8 @@ public static RpcMetrics create(Server server, 
Configuration conf) {
   MutableQuantiles[] rpcLockWaitTimeQuantiles;
   @Metric("Processing time") MutableRate rpcProcessingTime;
   MutableQuantiles[] rpcProcessingTimeQuantiles;
+  @Metric("Deferred Processing time") MutableRate deferredRpcProcessingTime;
+  MutableQuantiles[] deferredRpcProcessingTimeQuantiles;
   @Metric("Number of authentication failures")
   MutableCounterLong rpcAuthenticationFailures;
   @Metric("Number of authentication successes")
@@ -228,6 +236,15 @@ public void addRpcProcessingTime(long processingTime) {
     }
   }
 
+  public void addDeferredRpcProcessingTime(long processingTime) {
+    deferredRpcProcessingTime.add(processingTime);
+    if (rpcQuantileEnable) {
+      for (MutableQuantiles q : deferredRpcProcessingTimeQuantiles) {
+        q.add(processingTime);
+      }
+    }
+  }
+
   /**
    * One client backoff event
    */
@@ -282,6 +299,22 @@ public long getRpcSlowCalls() {
     return rpcSlowCalls.value();
   }
 
+  public MutableRate getDeferredRpcProcessingTime() {
+    return deferredRpcProcessingTime;
+  }
+
+  public long getDeferredRpcProcessingSampleCount() {
+    return deferredRpcProcessingTime.lastStat().numSamples();
+  }
+
+  public double getDeferredRpcProcessingMean() {
+    return deferredRpcProcessingTime.lastStat().mean();
+  }
+
+  public double getDeferredRpcProcessingStdDev() {
+    return deferredRpcProcessingTime.lastStat().stddev();
+  }
+
   public MetricsTag getTag(String tagName) {
     return registry.getTag(tagName);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to