Repository: hadoop
Updated Branches:
  refs/heads/trunk 4da5000dd -> d288a0ba8


HADOOP-13465. Design Server.Call to be extensible for unified call queue. 
Contributed by Daryn Sharp.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d288a0ba
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d288a0ba
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d288a0ba

Branch: refs/heads/trunk
Commit: d288a0ba8364d81aacda9f4a21022eecb6dc4e22
Parents: 4da5000
Author: Kihwal Lee <kih...@apache.org>
Authored: Thu Aug 25 11:43:39 2016 -0500
Committer: Kihwal Lee <kih...@apache.org>
Committed: Thu Aug 25 11:44:13 2016 -0500

----------------------------------------------------------------------
 .../main/java/org/apache/hadoop/ipc/Server.java | 336 +++++++++++--------
 1 file changed, 191 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d288a0ba/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index 4c73f6a..09fe889 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -354,10 +354,9 @@ public abstract class Server {
    */
   public static InetAddress getRemoteIp() {
     Call call = CurCall.get();
-    return (call != null && call.connection != null) ? call.connection
-        .getHostInetAddress() : null;
+    return (call != null ) ? call.getHostInetAddress() : null;
   }
-  
+
   /**
    * Returns the clientId from the current RPC request
    */
@@ -380,10 +379,9 @@ public abstract class Server {
    */
   public static UserGroupInformation getRemoteUser() {
     Call call = CurCall.get();
-    return (call != null && call.connection != null) ? call.connection.user
-        : null;
+    return (call != null) ? call.getRemoteUser() : null;
   }
- 
+
   /** Return true if the invocation was through an RPC.
    */
   public static boolean isRpcInvocation() {
@@ -483,7 +481,7 @@ public abstract class Server {
     if ((rpcMetrics.getProcessingSampleCount() > minSampleSize) &&
         (processingTime > threeSigma)) {
       if(LOG.isWarnEnabled()) {
-        String client = CurCall.get().connection.toString();
+        String client = CurCall.get().toString();
         LOG.warn(
             "Slow RPC : " + methodName + " took " + processingTime +
                 " milliseconds to process from client " + client);
@@ -657,62 +655,65 @@ public abstract class Server {
         CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT);
   }
 
-  /** A call queued for handling. */
-  public static class Call implements Schedulable {
-    private final int callId;             // the client's call id
-    private final int retryCount;        // the retry count of the call
-    private final Writable rpcRequest;    // Serialized Rpc request from client
-    private final Connection connection;  // connection to client
-    private long timestamp;               // time received when response is 
null
-                                          // time served when response is not 
null
-    private ByteBuffer rpcResponse;       // the response for this call
+  /** A generic call queued for handling. */
+  public static class Call implements Schedulable,
+  PrivilegedExceptionAction<Void> {
+    final int callId;            // the client's call id
+    final int retryCount;        // the retry count of the call
+    long timestamp;              // time received when response is null
+                                 // time served when response is not null
     private AtomicInteger responseWaitCount = new AtomicInteger(1);
-    private final RPC.RpcKind rpcKind;
-    private final byte[] clientId;
+    final RPC.RpcKind rpcKind;
+    final byte[] clientId;
     private final TraceScope traceScope; // the HTrace scope on the server side
     private final CallerContext callerContext; // the call context
     private int priorityLevel;
     // the priority level assigned by scheduler, 0 by default
 
-    private Call(Call call) {
-      this(call.callId, call.retryCount, call.rpcRequest, call.connection,
-          call.rpcKind, call.clientId, call.traceScope, call.callerContext);
+    Call(Call call) {
+      this(call.callId, call.retryCount, call.rpcKind, call.clientId,
+          call.traceScope, call.callerContext);
     }
 
-    public Call(int id, int retryCount, Writable param, 
-        Connection connection) {
-      this(id, retryCount, param, connection, RPC.RpcKind.RPC_BUILTIN,
-          RpcConstants.DUMMY_CLIENT_ID);
+    Call(int id, int retryCount, RPC.RpcKind kind, byte[] clientId) {
+      this(id, retryCount, kind, clientId, null, null);
     }
 
-    public Call(int id, int retryCount, Writable param, Connection connection,
+    @VisibleForTesting // primarily TestNamenodeRetryCache
+    public Call(int id, int retryCount, Void ignore1, Void ignore2,
         RPC.RpcKind kind, byte[] clientId) {
-      this(id, retryCount, param, connection, kind, clientId, null, null);
+      this(id, retryCount, kind, clientId, null, null);
     }
 
-    public Call(int id, int retryCount, Writable param, Connection connection,
-        RPC.RpcKind kind, byte[] clientId, TraceScope traceScope,
-        CallerContext callerContext) {
+    Call(int id, int retryCount, RPC.RpcKind kind, byte[] clientId,
+        TraceScope traceScope, CallerContext callerContext) {
       this.callId = id;
       this.retryCount = retryCount;
-      this.rpcRequest = param;
-      this.connection = connection;
       this.timestamp = Time.now();
-      this.rpcResponse = null;
       this.rpcKind = kind;
       this.clientId = clientId;
       this.traceScope = traceScope;
       this.callerContext = callerContext;
     }
-    
+
     @Override
     public String toString() {
-      return rpcRequest + " from " + connection + " Call#" + callId + " Retry#"
-          + retryCount;
+      return "Call#" + callId + " Retry#" + retryCount;
     }
 
-    public void setResponse(ByteBuffer response) {
-      this.rpcResponse = response;
+    public Void run() throws Exception {
+      return null;
+    }
+    // should eventually be abstract but need to avoid breaking tests
+    public UserGroupInformation getRemoteUser() {
+      return null;
+    }
+    public InetAddress getHostInetAddress() {
+      return null;
+    }
+    public String getHostAddress() {
+      InetAddress addr = getHostInetAddress();
+      return (addr != null) ? addr.getHostAddress() : null;
     }
 
     /**
@@ -724,34 +725,36 @@ public abstract class Server {
      */
     @InterfaceStability.Unstable
     @InterfaceAudience.LimitedPrivate({"HDFS"})
-    public void postponeResponse() {
+    public final void postponeResponse() {
       int count = responseWaitCount.incrementAndGet();
       assert count > 0 : "response has already been sent";
     }
 
     @InterfaceStability.Unstable
     @InterfaceAudience.LimitedPrivate({"HDFS"})
-    public void sendResponse() throws IOException {
+    public final void sendResponse() throws IOException {
       int count = responseWaitCount.decrementAndGet();
       assert count >= 0 : "response has already been sent";
       if (count == 0) {
-        connection.sendResponse(this);
+        doResponse(null);
       }
     }
 
     @InterfaceStability.Unstable
     @InterfaceAudience.LimitedPrivate({"HDFS"})
-    public void abortResponse(Throwable t) throws IOException {
+    public final void abortResponse(Throwable t) throws IOException {
       // don't send response if the call was already sent or aborted.
       if (responseWaitCount.getAndSet(-1) > 0) {
-        connection.abortResponse(this, t);
+        doResponse(t);
       }
     }
 
+    void doResponse(Throwable t) throws IOException {}
+
     // For Schedulable
     @Override
     public UserGroupInformation getUserGroupInformation() {
-      return connection.user;
+      return getRemoteUser();
     }
 
     @Override
@@ -764,6 +767,114 @@ public abstract class Server {
     }
   }
 
+  /** A RPC extended call queued for handling. */
+  private class RpcCall extends Call {
+    final Connection connection;  // connection to client
+    final Writable rpcRequest;    // Serialized Rpc request from client
+    ByteBuffer rpcResponse;       // the response for this call
+
+    RpcCall(RpcCall call) {
+      super(call);
+      this.connection = call.connection;
+      this.rpcRequest = call.rpcRequest;
+    }
+
+    RpcCall(Connection connection, int id) {
+      this(connection, id, RpcConstants.INVALID_RETRY_COUNT);
+    }
+
+    RpcCall(Connection connection, int id, int retryCount) {
+      this(connection, id, retryCount, null,
+          RPC.RpcKind.RPC_BUILTIN, RpcConstants.DUMMY_CLIENT_ID,
+          null, null);
+    }
+
+    RpcCall(Connection connection, int id, int retryCount,
+        Writable param, RPC.RpcKind kind, byte[] clientId,
+        TraceScope traceScope, CallerContext context) {
+      super(id, retryCount, kind, clientId, traceScope, context);
+      this.connection = connection;
+      this.rpcRequest = param;
+    }
+
+    @Override
+    public UserGroupInformation getRemoteUser() {
+      return connection.user;
+    }
+
+    @Override
+    public InetAddress getHostInetAddress() {
+      return connection.getHostInetAddress();
+    }
+
+    @Override
+    public Void run() throws Exception {
+      if (!connection.channel.isOpen()) {
+        Server.LOG.info(Thread.currentThread().getName() + ": skipped " + 
this);
+        return null;
+      }
+      String errorClass = null;
+      String error = null;
+      RpcStatusProto returnStatus = RpcStatusProto.SUCCESS;
+      RpcErrorCodeProto detailedErr = null;
+      Writable value = null;
+
+      try {
+        value = call(
+            rpcKind, connection.protocolName, rpcRequest, timestamp);
+      } catch (Throwable e) {
+        if (e instanceof UndeclaredThrowableException) {
+          e = e.getCause();
+        }
+        logException(Server.LOG, e, this);
+        if (e instanceof RpcServerException) {
+          RpcServerException rse = ((RpcServerException)e);
+          returnStatus = rse.getRpcStatusProto();
+          detailedErr = rse.getRpcErrorCodeProto();
+        } else {
+          returnStatus = RpcStatusProto.ERROR;
+          detailedErr = RpcErrorCodeProto.ERROR_APPLICATION;
+        }
+        errorClass = e.getClass().getName();
+        error = StringUtils.stringifyException(e);
+        // Remove redundant error class name from the beginning of the
+        // stack trace
+        String exceptionHdr = errorClass + ": ";
+        if (error.startsWith(exceptionHdr)) {
+          error = error.substring(exceptionHdr.length());
+        }
+      }
+      setupResponse(this, returnStatus, detailedErr,
+          value, errorClass, error);
+      sendResponse();
+      return null;
+    }
+
+    void setResponse(ByteBuffer response) throws IOException {
+      this.rpcResponse = response;
+    }
+
+    @Override
+    void doResponse(Throwable t) throws IOException {
+      RpcCall call = this;
+      if (t != null) {
+        // clone the call to prevent a race with another thread stomping
+        // on the response while being sent.  the original call is
+        // effectively discarded since the wait count won't hit zero
+        call = new RpcCall(this);
+        setupResponse(call,
+            RpcStatusProto.FATAL, RpcErrorCodeProto.ERROR_RPC_SERVER,
+            null, t.getClass().getName(), StringUtils.stringifyException(t));
+      }
+      connection.sendResponse(this);
+    }
+
+    @Override
+    public String toString() {
+      return super.toString() + " " + rpcRequest + " from " + connection;
+    }
+  }
+
   /** Listens on the socket. Creates jobs for the handler threads*/
   private class Listener extends Thread {
     
@@ -1094,22 +1205,22 @@ public abstract class Server {
           if(LOG.isDebugEnabled()) {
             LOG.debug("Checking for old call responses.");
           }
-          ArrayList<Call> calls;
+          ArrayList<RpcCall> calls;
           
           // get the list of channels from list of keys.
           synchronized (writeSelector.keys()) {
-            calls = new ArrayList<Call>(writeSelector.keys().size());
+            calls = new ArrayList<RpcCall>(writeSelector.keys().size());
             iter = writeSelector.keys().iterator();
             while (iter.hasNext()) {
               SelectionKey key = iter.next();
-              Call call = (Call)key.attachment();
+              RpcCall call = (RpcCall)key.attachment();
               if (call != null && key.channel() == call.connection.channel) { 
                 calls.add(call);
               }
             }
           }
-          
-          for(Call call : calls) {
+
+          for (RpcCall call : calls) {
             doPurge(call, now);
           }
         } catch (OutOfMemoryError e) {
@@ -1127,7 +1238,7 @@ public abstract class Server {
     }
 
     private void doAsyncWrite(SelectionKey key) throws IOException {
-      Call call = (Call)key.attachment();
+      RpcCall call = (RpcCall)key.attachment();
       if (call == null) {
         return;
       }
@@ -1155,10 +1266,10 @@ public abstract class Server {
     // Remove calls that have been pending in the responseQueue 
     // for a long time.
     //
-    private void doPurge(Call call, long now) {
-      LinkedList<Call> responseQueue = call.connection.responseQueue;
+    private void doPurge(RpcCall call, long now) {
+      LinkedList<RpcCall> responseQueue = call.connection.responseQueue;
       synchronized (responseQueue) {
-        Iterator<Call> iter = responseQueue.listIterator(0);
+        Iterator<RpcCall> iter = responseQueue.listIterator(0);
         while (iter.hasNext()) {
           call = iter.next();
           if (now > call.timestamp + PURGE_INTERVAL) {
@@ -1172,12 +1283,12 @@ public abstract class Server {
     // Processes one response. Returns true if there are no more pending
     // data for this channel.
     //
-    private boolean processResponse(LinkedList<Call> responseQueue,
+    private boolean processResponse(LinkedList<RpcCall> responseQueue,
                                     boolean inHandler) throws IOException {
       boolean error = true;
       boolean done = false;       // there is more data for this channel.
       int numElements = 0;
-      Call call = null;
+      RpcCall call = null;
       try {
         synchronized (responseQueue) {
           //
@@ -1260,7 +1371,7 @@ public abstract class Server {
     //
     // Enqueue a response from the application.
     //
-    void doRespond(Call call) throws IOException {
+    void doRespond(RpcCall call) throws IOException {
       synchronized (call.connection.responseQueue) {
         // must only wrap before adding to the responseQueue to prevent
         // postponed responses from being encrypted and sent out of order.
@@ -1358,7 +1469,7 @@ public abstract class Server {
     private SocketChannel channel;
     private ByteBuffer data;
     private ByteBuffer dataLengthBuffer;
-    private LinkedList<Call> responseQueue;
+    private LinkedList<RpcCall> responseQueue;
     // number of outstanding rpcs
     private AtomicInteger rpcCount = new AtomicInteger();
     private long lastContact;
@@ -1385,8 +1496,8 @@ public abstract class Server {
     public UserGroupInformation attemptingUser = null; // user name before auth
 
     // Fake 'call' for failed authorization response
-    private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALL_ID,
-        RpcConstants.INVALID_RETRY_COUNT, null, this);
+    private final RpcCall authFailedCall =
+        new RpcCall(this, AUTHORIZATION_FAILED_CALL_ID);
 
     private boolean sentNegotiate = false;
     private boolean useWrap = false;
@@ -1409,7 +1520,7 @@ public abstract class Server {
         this.hostAddress = addr.getHostAddress();
       }
       this.remotePort = socket.getPort();
-      this.responseQueue = new LinkedList<Call>();
+      this.responseQueue = new LinkedList<RpcCall>();
       if (socketSendBufferSize != 0) {
         try {
           socket.setSendBufferSize(socketSendBufferSize);
@@ -1704,8 +1815,7 @@ public abstract class Server {
     }
 
     private void doSaslReply(Message message) throws IOException {
-      final Call saslCall = new Call(AuthProtocol.SASL.callId,
-          RpcConstants.INVALID_RETRY_COUNT, null, this);
+      final RpcCall saslCall = new RpcCall(this, AuthProtocol.SASL.callId);
       setupResponse(saslCall,
           RpcStatusProto.SUCCESS, null,
           RpcWritable.wrap(message), null, null);
@@ -1922,23 +2032,20 @@ public abstract class Server {
       
       if (clientVersion >= 9) {
         // Versions >>9  understand the normal response
-        Call fakeCall = new Call(-1, RpcConstants.INVALID_RETRY_COUNT, null,
-            this);
+        RpcCall fakeCall = new RpcCall(this, -1);
         setupResponse(fakeCall,
             RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_VERSION_MISMATCH,
             null, VersionMismatch.class.getName(), errMsg);
         fakeCall.sendResponse();
       } else if (clientVersion >= 3) {
-        Call fakeCall = new Call(-1, RpcConstants.INVALID_RETRY_COUNT, null,
-            this);
+        RpcCall fakeCall = new RpcCall(this, -1);
         // Versions 3 to 8 use older response
         setupResponseOldVersionFatal(buffer, fakeCall,
             null, VersionMismatch.class.getName(), errMsg);
 
         fakeCall.sendResponse();
       } else if (clientVersion == 2) { // Hadoop 0.18.3
-        Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null,
-            this);
+        RpcCall fakeCall = new RpcCall(this, 0);
         DataOutputStream out = new DataOutputStream(buffer);
         out.writeInt(0); // call ID
         out.writeBoolean(true); // error
@@ -1950,7 +2057,7 @@ public abstract class Server {
     }
     
     private void setupHttpRequestOnIpcPortResponse() throws IOException {
-      Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null, 
this);
+      RpcCall fakeCall = new RpcCall(this, 0);
       fakeCall.setResponse(ByteBuffer.wrap(
           RECEIVED_HTTP_REQ_RESPONSE.getBytes(StandardCharsets.UTF_8)));
       fakeCall.sendResponse();
@@ -2098,7 +2205,7 @@ public abstract class Server {
         }
       } catch (WrappedRpcServerException wrse) { // inform client of error
         Throwable ioe = wrse.getCause();
-        final Call call = new Call(callId, retry, null, this);
+        final RpcCall call = new RpcCall(this, callId, retry);
         setupResponse(call,
             RpcStatusProto.FATAL, wrse.getRpcErrorCodeProto(), null,
             ioe.getClass().getName(), ioe.getMessage());
@@ -2198,8 +2305,9 @@ public abstract class Server {
                 .build();
       }
 
-      Call call = new Call(header.getCallId(), header.getRetryCount(),
-          rpcRequest, this, ProtoUtil.convert(header.getRpcKind()),
+      RpcCall call = new RpcCall(this, header.getCallId(),
+          header.getRetryCount(), rpcRequest,
+          ProtoUtil.convert(header.getRpcKind()),
           header.getClientId().toByteArray(), traceScope, callerContext);
 
       // Save the priority level assignment by the scheduler
@@ -2323,21 +2431,10 @@ public abstract class Server {
       }
     }
 
-    private void sendResponse(Call call) throws IOException {
+    private void sendResponse(RpcCall call) throws IOException {
       responder.doRespond(call);
     }
 
-    private void abortResponse(Call call, Throwable t) throws IOException {
-      // clone the call to prevent a race with the other thread stomping
-      // on the response while being sent.  the original call is
-      // effectively discarded since the wait count won't hit zero
-      call = new Call(call);
-      setupResponse(call,
-          RpcStatusProto.FATAL, RpcErrorCodeProto.ERROR_RPC_SERVER,
-          null, t.getClass().getName(), StringUtils.stringifyException(t));
-      call.sendResponse();
-    }
-
     /**
      * Get service class for connection
      * @return the serviceClass
@@ -2388,16 +2485,6 @@ public abstract class Server {
           if (LOG.isDebugEnabled()) {
             LOG.debug(Thread.currentThread().getName() + ": " + call + " for 
RpcKind " + call.rpcKind);
           }
-          if (!call.connection.channel.isOpen()) {
-            LOG.info(Thread.currentThread().getName() + ": skipped " + call);
-            continue;
-          }
-          String errorClass = null;
-          String error = null;
-          RpcStatusProto returnStatus = RpcStatusProto.SUCCESS;
-          RpcErrorCodeProto detailedErr = null;
-          Writable value = null;
-
           CurCall.set(call);
           if (call.traceScope != null) {
             call.traceScope.reattach();
@@ -2406,53 +2493,11 @@ public abstract class Server {
           }
           // always update the current call context
           CallerContext.setCurrent(call.callerContext);
-
-          try {
-            // Make the call as the user via Subject.doAs, thus associating
-            // the call with the Subject
-            if (call.connection.user == null) {
-              value = call(call.rpcKind, call.connection.protocolName, 
call.rpcRequest, 
-                           call.timestamp);
-            } else {
-              value = 
-                call.connection.user.doAs
-                  (new PrivilegedExceptionAction<Writable>() {
-                     @Override
-                     public Writable run() throws Exception {
-                       // make the call
-                       return call(call.rpcKind, call.connection.protocolName, 
-                                   call.rpcRequest, call.timestamp);
-
-                     }
-                   }
-                  );
-            }
-          } catch (Throwable e) {
-            if (e instanceof UndeclaredThrowableException) {
-              e = e.getCause();
-            }
-            logException(LOG, e, call);
-            if (e instanceof RpcServerException) {
-              RpcServerException rse = ((RpcServerException)e); 
-              returnStatus = rse.getRpcStatusProto();
-              detailedErr = rse.getRpcErrorCodeProto();
-            } else {
-              returnStatus = RpcStatusProto.ERROR;
-              detailedErr = RpcErrorCodeProto.ERROR_APPLICATION;
-            }
-            errorClass = e.getClass().getName();
-            error = StringUtils.stringifyException(e);
-            // Remove redundant error class name from the beginning of the 
stack trace
-            String exceptionHdr = errorClass + ": ";
-            if (error.startsWith(exceptionHdr)) {
-              error = error.substring(exceptionHdr.length());
-            }
-          }
-          CurCall.set(null);
-          synchronized (call.connection.responseQueue) {
-            setupResponse(call, returnStatus, detailedErr,
-                value, errorClass, error);
-            call.sendResponse();
+          UserGroupInformation remoteUser = call.getRemoteUser();
+          if (remoteUser != null) {
+            remoteUser.doAs(call);
+          } else {
+            call.run();
           }
         } catch (InterruptedException e) {
           if (running) {                          // unexpected -- log it
@@ -2469,6 +2514,7 @@ public abstract class Server {
                 StringUtils.stringifyException(e));
           }
         } finally {
+          CurCall.set(null);
           IOUtils.cleanup(LOG, traceScope);
         }
       }
@@ -2670,7 +2716,7 @@ public abstract class Server {
    * @throws IOException
    */
   private void setupResponse(
-      Call call, RpcStatusProto status, RpcErrorCodeProto erCode,
+      RpcCall call, RpcStatusProto status, RpcErrorCodeProto erCode,
       Writable rv, String errorClass, String error)
           throws IOException {
     RpcResponseHeaderProto.Builder headerBuilder =
@@ -2704,7 +2750,7 @@ public abstract class Server {
     }
   }
 
-  private void setupResponse(Call call,
+  private void setupResponse(RpcCall call,
       RpcResponseHeaderProto header, Writable rv) throws IOException {
     ResponseBuffer buf = responseBuffer.get().reset();
     try {
@@ -2738,7 +2784,7 @@ public abstract class Server {
    * @throws IOException
    */
   private void setupResponseOldVersionFatal(ByteArrayOutputStream response, 
-                             Call call,
+                             RpcCall call,
                              Writable rv, String errorClass, String error) 
   throws IOException {
     final int OLD_VERSION_FATAL_STATUS = -1;
@@ -2751,7 +2797,7 @@ public abstract class Server {
     call.setResponse(ByteBuffer.wrap(response.toByteArray()));
   }
 
-  private void wrapWithSasl(Call call) throws IOException {
+  private void wrapWithSasl(RpcCall call) throws IOException {
     if (call.connection.saslServer != null) {
       byte[] token = call.rpcResponse.array();
       // synchronization may be needed since there can be multiple Handler


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to