This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 6f5e5e4  HBASE-23938 : System table hbase:slowlog to store complete 
slow/large… (#1681)
6f5e5e4 is described below

commit 6f5e5e4828e965994ab20311979002d708bfc59f
Author: Viraj Jasani <vjas...@apache.org>
AuthorDate: Wed May 20 15:10:29 2020 +0530

    HBASE-23938 : System table hbase:slowlog to store complete slow/large… 
(#1681)
    
    Signed-off-by: Duo Zhang <zhang...@apache.org>
    Signed-off-by: Anoop Sam John <anoopsamj...@apache.org>
    Signed-off-by: ramkrish86 <ramkris...@apache.org>
---
 .../hadoop/hbase/slowlog/SlowLogTableAccessor.java | 150 +++++++++++++++
 .../java/org/apache/hadoop/hbase/HConstants.java   |  10 +
 .../org/apache/hadoop/hbase/ipc/RpcServer.java     |   2 +-
 .../org/apache/hadoop/hbase/master/HMaster.java    |   3 +
 .../hbase/master/slowlog/SlowLogMasterService.java |  73 ++++++++
 .../hadoop/hbase/regionserver/HRegionServer.java   |  15 ++
 .../regionserver/slowlog/LogEventHandler.java      |  80 +++++++-
 .../hbase/regionserver/slowlog/RpcLogDetails.java  |  11 +-
 .../regionserver/slowlog/SlowLogRecorder.java      |  14 +-
 .../regionserver/slowlog/SlowLogTableOpsChore.java |  63 +++++++
 .../regionserver/slowlog/TestSlowLogAccessor.java  | 204 +++++++++++++++++++++
 .../regionserver/slowlog/TestSlowLogRecorder.java  |  15 +-
 12 files changed, 619 insertions(+), 21 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/slowlog/SlowLogTableAccessor.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/slowlog/SlowLogTableAccessor.java
new file mode 100644
index 0000000..f4f29c6
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/slowlog/SlowLogTableAccessor.java
@@ -0,0 +1,150 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.slowlog;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Slowlog Accessor to record slow/large RPC log identified at each 
RegionServer RpcServer level.
+ * This can be done only optionally to record the entire history of slow/large 
rpc calls
+ * since RingBuffer can handle only limited latest records.
+ */
+@InterfaceAudience.Private
+public class SlowLogTableAccessor {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SlowLogTableAccessor.class);
+
+  private static final Random RANDOM = new Random();
+
+  private static Connection connection;
+
+  /**
+   * hbase:slowlog table name - can be enabled
+   * with config - hbase.regionserver.slowlog.systable.enabled
+   */
+  public static final TableName SLOW_LOG_TABLE_NAME =
+    TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, 
"slowlog");
+
+  private static void doPut(final Connection connection, final List<Put> puts)
+      throws IOException {
+    try (Table table = connection.getTable(SLOW_LOG_TABLE_NAME)) {
+      table.put(puts);
+    }
+  }
+
+  /**
+   * Add slow/large log records to hbase:slowlog table
+   * @param slowLogPayloads List of SlowLogPayload to process
+   * @param configuration Configuration to use for connection
+   */
+  public static void addSlowLogRecords(final List<TooSlowLog.SlowLogPayload> 
slowLogPayloads,
+      final Configuration configuration) {
+    List<Put> puts = new ArrayList<>(slowLogPayloads.size());
+    for (TooSlowLog.SlowLogPayload slowLogPayload : slowLogPayloads) {
+      final byte[] rowKey = getRowKey(slowLogPayload);
+      final Put put = new Put(rowKey).setDurability(Durability.SKIP_WAL)
+        .setPriority(HConstants.NORMAL_QOS)
+        .addColumn(HConstants.SLOWLOG_INFO_FAMILY, 
Bytes.toBytes("call_details"),
+          Bytes.toBytes(slowLogPayload.getCallDetails()))
+        .addColumn(HConstants.SLOWLOG_INFO_FAMILY, 
Bytes.toBytes("client_address"),
+          Bytes.toBytes(slowLogPayload.getClientAddress()))
+        .addColumn(HConstants.SLOWLOG_INFO_FAMILY, 
Bytes.toBytes("method_name"),
+          Bytes.toBytes(slowLogPayload.getMethodName()))
+        .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("param"),
+          Bytes.toBytes(slowLogPayload.getParam()))
+        .addColumn(HConstants.SLOWLOG_INFO_FAMILY, 
Bytes.toBytes("processing_time"),
+          Bytes.toBytes(Integer.toString(slowLogPayload.getProcessingTime())))
+        .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("queue_time"),
+          Bytes.toBytes(Integer.toString(slowLogPayload.getQueueTime())))
+        .addColumn(HConstants.SLOWLOG_INFO_FAMILY, 
Bytes.toBytes("region_name"),
+          Bytes.toBytes(slowLogPayload.getRegionName()))
+        .addColumn(HConstants.SLOWLOG_INFO_FAMILY, 
Bytes.toBytes("response_size"),
+          Bytes.toBytes(Long.toString(slowLogPayload.getResponseSize())))
+        .addColumn(HConstants.SLOWLOG_INFO_FAMILY, 
Bytes.toBytes("server_class"),
+          Bytes.toBytes(slowLogPayload.getServerClass()))
+        .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("start_time"),
+          Bytes.toBytes(Long.toString(slowLogPayload.getStartTime())))
+        .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("type"),
+          Bytes.toBytes(slowLogPayload.getType().name()))
+        .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("username"),
+          Bytes.toBytes(slowLogPayload.getUserName()));
+      puts.add(put);
+    }
+    try {
+      if (connection == null) {
+        createConnection(configuration);
+      }
+      doPut(connection, puts);
+    } catch (Exception e) {
+      LOG.warn("Failed to add slow/large log records to hbase:slowlog table.", 
e);
+    }
+  }
+
+  private static synchronized void createConnection(Configuration 
configuration)
+      throws IOException {
+    Configuration conf = new Configuration(configuration);
+    // rpc timeout: 20s
+    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 20000);
+    // retry count: 5
+    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
+    conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
+    connection = ConnectionFactory.createConnection(conf);
+  }
+
+  /**
+   * Create rowKey: currentTimeMillis APPEND slowLogPayload.hashcode
+   * Scan on slowlog table should keep records with sorted order of time, 
however records
+   * added at the very same time (currentTimeMillis) could be in random order.
+   *
+   * @param slowLogPayload SlowLogPayload to process
+   * @return rowKey byte[]
+   */
+  private static byte[] getRowKey(final TooSlowLog.SlowLogPayload 
slowLogPayload) {
+    String hashcode = String.valueOf(slowLogPayload.hashCode());
+    String lastFiveDig =
+      hashcode.substring((hashcode.length() > 5) ? (hashcode.length() - 5) : 
0);
+    if (lastFiveDig.startsWith("-")) {
+      lastFiveDig = String.valueOf(RANDOM.nextInt(99999));
+    }
+    final long currentTimeMillis = EnvironmentEdgeManager.currentTime();
+    final String timeAndHashcode = currentTimeMillis + lastFiveDig;
+    final long rowKeyLong = Long.parseLong(timeAndHashcode);
+    return Bytes.toBytes(rowKeyLong);
+  }
+
+}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index fdc3532..8c3d295 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1563,6 +1563,16 @@ public final class HConstants {
     "hbase.regionserver.slowlog.buffer.enabled";
   public static final boolean DEFAULT_ONLINE_LOG_PROVIDER_ENABLED = false;
 
+  /** The slowlog info family as a string*/
+  private static final String SLOWLOG_INFO_FAMILY_STR = "info";
+
+  /** The slowlog info family */
+  public static final byte [] SLOWLOG_INFO_FAMILY = 
Bytes.toBytes(SLOWLOG_INFO_FAMILY_STR);
+
+  public static final String SLOW_LOG_SYS_TABLE_ENABLED_KEY =
+    "hbase.regionserver.slowlog.systable.enabled";
+  public static final boolean DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY = false;
+
   private HConstants() {
     // Can't be instantiated with this ctor.
   }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index d49122a..3a59a4b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -435,7 +435,7 @@ public abstract class RpcServer implements 
RpcServerInterface,
           final String className = server == null ? StringUtils.EMPTY :
             server.getClass().getSimpleName();
           this.slowLogRecorder.addSlowLogPayload(
-            new RpcLogDetails(call, status.getClient(), responseSize, 
className, tooSlow,
+            new RpcLogDetails(call, param, status.getClient(), responseSize, 
className, tooSlow,
               tooLarge));
         }
       }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 482e9bb..26f0092 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -143,6 +143,7 @@ import 
org.apache.hadoop.hbase.master.replication.ModifyPeerProcedure;
 import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure;
 import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
 import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
+import org.apache.hadoop.hbase.master.slowlog.SlowLogMasterService;
 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
 import org.apache.hadoop.hbase.master.zksyncer.MasterAddressSyncer;
 import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
@@ -1139,6 +1140,8 @@ public class HMaster extends HRegionServer implements 
MasterServices {
       // Start the chore to read snapshots and add their usage to table/NS 
quotas
       getChoreService().scheduleChore(snapshotQuotaChore);
     }
+    final SlowLogMasterService slowLogMasterService = new 
SlowLogMasterService(conf, this);
+    slowLogMasterService.init();
 
     // clear the dead servers with same host name and port of online server 
because we are not
     // removing dead server with same hostname and port of rs which is trying 
to check in before
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/slowlog/SlowLogMasterService.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/slowlog/SlowLogMasterService.java
new file mode 100644
index 0000000..554ed88
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/slowlog/SlowLogMasterService.java
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.slowlog;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Slowlog Master services - Table creation to be used by HMaster
+ */
+@InterfaceAudience.Private
+public class SlowLogMasterService {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SlowLogMasterService.class);
+
+  private final boolean slowlogTableEnabled;
+  private final MasterServices masterServices;
+
+  private static final TableDescriptorBuilder TABLE_DESCRIPTOR_BUILDER =
+    TableDescriptorBuilder.newBuilder(SlowLogTableAccessor.SLOW_LOG_TABLE_NAME)
+      .setRegionReplication(1)
+      .setColumnFamily(
+        
ColumnFamilyDescriptorBuilder.newBuilder(HConstants.SLOWLOG_INFO_FAMILY)
+          .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
+          .setBlockCacheEnabled(false)
+          .setMaxVersions(1).build());
+
+  public SlowLogMasterService(final Configuration configuration,
+      final MasterServices masterServices) {
+    slowlogTableEnabled = 
configuration.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
+      HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
+    this.masterServices = masterServices;
+  }
+
+  public void init() throws IOException {
+    if (!slowlogTableEnabled) {
+      LOG.info("Slow/Large requests logging to system table hbase:slowlog is 
disabled. Quitting.");
+      return;
+    }
+    if (!MetaTableAccessor.tableExists(masterServices.getConnection(),
+        SlowLogTableAccessor.SLOW_LOG_TABLE_NAME)) {
+      LOG.info("slowlog table not found. Creating.");
+      this.masterServices.createSystemTable(TABLE_DESCRIPTOR_BUILDER.build());
+    }
+  }
+
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index c76055e..cd0cc3a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -138,6 +138,7 @@ import 
org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
 import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
 import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
 import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
+import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogTableOpsChore;
 import 
org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
@@ -427,6 +428,8 @@ public class HRegionServer extends HasThread implements
 
   private final RegionServerAccounting regionServerAccounting;
 
+  private SlowLogTableOpsChore slowLogTableOpsChore = null;
+
   // Block cache
   private BlockCache blockCache;
   // The cache for mob files
@@ -2011,6 +2014,9 @@ public class HRegionServer extends HasThread implements
     if (this.storefileRefresher != null) 
choreService.scheduleChore(storefileRefresher);
     if (this.movedRegionsCleaner != null) 
choreService.scheduleChore(movedRegionsCleaner);
     if (this.fsUtilizationChore != null) 
choreService.scheduleChore(fsUtilizationChore);
+    if (this.slowLogTableOpsChore != null) {
+      choreService.scheduleChore(slowLogTableOpsChore);
+    }
 
     // Leases is not a Thread. Internally it runs a daemon thread. If it gets
     // an unhandled exception, it will just exit.
@@ -2056,6 +2062,14 @@ public class HRegionServer extends HasThread implements
     this.periodicFlusher = new 
PeriodicMemStoreFlusher(this.flushCheckFrequency, this);
     this.leaseManager = new LeaseManager(this.threadWakeFrequency);
 
+    final boolean isSlowLogTableEnabled = 
conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
+      HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
+    if (isSlowLogTableEnabled) {
+      // default chore duration: 10 min
+      final int duration = 
conf.getInt("hbase.slowlog.systable.chore.duration", 10 * 60 * 1000);
+      slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, 
this.slowLogRecorder);
+    }
+
     // Create the thread to clean the moved regions list
     movedRegionsCleaner = MovedRegionsCleaner.create(this);
 
@@ -2559,6 +2573,7 @@ public class HRegionServer extends HasThread implements
       choreService.cancelChore(storefileRefresher);
       choreService.cancelChore(movedRegionsCleaner);
       choreService.cancelChore(fsUtilizationChore);
+      choreService.cancelChore(slowLogTableOpsChore);
       // clean up the remaining scheduled chores (in case we missed out any)
       choreService.shutdown();
     }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogEventHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogEventHandler.java
index 508f086..8d500de 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogEventHandler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogEventHandler.java
@@ -27,11 +27,14 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Queue;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.SlowLogParams;
 import org.apache.hadoop.hbase.ipc.RpcCall;
+import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,11 +57,32 @@ class LogEventHandler implements 
EventHandler<RingBufferEnvelope> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(LogEventHandler.class);
 
-  private final Queue<SlowLogPayload> queue;
+  private static final String SYS_TABLE_QUEUE_SIZE =
+    "hbase.regionserver.slowlog.systable.queue.size";
+  private static final int DEFAULT_SYS_TABLE_QUEUE_SIZE = 1000;
+  private static final int SYSTABLE_PUT_BATCH_SIZE = 100;
 
-  LogEventHandler(int eventCount) {
+  private final Queue<SlowLogPayload> queueForRingBuffer;
+  private final Queue<SlowLogPayload> queueForSysTable;
+  private final boolean isSlowLogTableEnabled;
+
+  private Configuration configuration;
+
+  private static final ReentrantLock LOCK = new ReentrantLock();
+
+  LogEventHandler(int eventCount, boolean isSlowLogTableEnabled, Configuration 
conf) {
+    this.configuration = conf;
     EvictingQueue<SlowLogPayload> evictingQueue = 
EvictingQueue.create(eventCount);
-    queue = Queues.synchronizedQueue(evictingQueue);
+    queueForRingBuffer = Queues.synchronizedQueue(evictingQueue);
+    this.isSlowLogTableEnabled = isSlowLogTableEnabled;
+    if (isSlowLogTableEnabled) {
+      int sysTableQueueSize = conf.getInt(SYS_TABLE_QUEUE_SIZE, 
DEFAULT_SYS_TABLE_QUEUE_SIZE);
+      EvictingQueue<SlowLogPayload> evictingQueueForTable =
+        EvictingQueue.create(sysTableQueueSize);
+      queueForSysTable = Queues.synchronizedQueue(evictingQueueForTable);
+    } else {
+      queueForSysTable = null;
+    }
   }
 
   /**
@@ -83,7 +107,7 @@ class LogEventHandler implements 
EventHandler<RingBufferEnvelope> {
       return;
     }
     Descriptors.MethodDescriptor methodDescriptor = rpcCall.getMethod();
-    Message param = rpcCall.getParam();
+    Message param = rpcCallDetails.getParam();
     long receiveTime = rpcCall.getReceiveTime();
     long startTime = rpcCall.getStartTime();
     long endTime = System.currentTimeMillis();
@@ -129,7 +153,12 @@ class LogEventHandler implements 
EventHandler<RingBufferEnvelope> {
       .setType(type)
       .setUserName(userName)
       .build();
-    queue.add(slowLogPayload);
+    queueForRingBuffer.add(slowLogPayload);
+    if (isSlowLogTableEnabled) {
+      if (!slowLogPayload.getRegionName().startsWith("hbase:slowlog")) {
+        queueForSysTable.add(slowLogPayload);
+      }
+    }
   }
 
   private SlowLogPayload.Type getLogType(RpcLogDetails rpcCallDetails) {
@@ -160,7 +189,7 @@ class LogEventHandler implements 
EventHandler<RingBufferEnvelope> {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Received request to clean up online slowlog buffer..");
     }
-    queue.clear();
+    queueForRingBuffer.clear();
     return true;
   }
 
@@ -172,7 +201,7 @@ class LogEventHandler implements 
EventHandler<RingBufferEnvelope> {
    */
   List<SlowLogPayload> getSlowLogPayloads(final 
AdminProtos.SlowLogResponseRequest request) {
     List<SlowLogPayload> slowLogPayloadList =
-      Arrays.stream(queue.toArray(new SlowLogPayload[0]))
+      Arrays.stream(queueForRingBuffer.toArray(new SlowLogPayload[0]))
         .filter(e -> e.getType() == SlowLogPayload.Type.ALL
           || e.getType() == SlowLogPayload.Type.SLOW_LOG)
         .collect(Collectors.toList());
@@ -191,7 +220,7 @@ class LogEventHandler implements 
EventHandler<RingBufferEnvelope> {
    */
   List<SlowLogPayload> getLargeLogPayloads(final 
AdminProtos.SlowLogResponseRequest request) {
     List<SlowLogPayload> slowLogPayloadList =
-      Arrays.stream(queue.toArray(new SlowLogPayload[0]))
+      Arrays.stream(queueForRingBuffer.toArray(new SlowLogPayload[0]))
         .filter(e -> e.getType() == SlowLogPayload.Type.ALL
           || e.getType() == SlowLogPayload.Type.LARGE_LOG)
         .collect(Collectors.toList());
@@ -207,8 +236,7 @@ class LogEventHandler implements 
EventHandler<RingBufferEnvelope> {
     if (isFilterProvided(request)) {
       logPayloadList = filterLogs(request, logPayloadList);
     }
-    int limit = request.getLimit() >= logPayloadList.size() ? 
logPayloadList.size()
-      : request.getLimit();
+    int limit = Math.min(request.getLimit(), logPayloadList.size());
     return logPayloadList.subList(0, limit);
   }
 
@@ -256,4 +284,36 @@ class LogEventHandler implements 
EventHandler<RingBufferEnvelope> {
     return filteredSlowLogPayloads;
   }
 
+  /**
+   * Poll from queueForSysTable and insert 100 records in hbase:slowlog table 
in single batch
+   */
+  void addAllLogsToSysTable() {
+    if (queueForSysTable == null) {
+      // hbase.regionserver.slowlog.systable.enabled is turned off. Exiting.
+      return;
+    }
+    if (LOCK.isLocked()) {
+      return;
+    }
+    LOCK.lock();
+    try {
+      List<SlowLogPayload> slowLogPayloads = new ArrayList<>();
+      int i = 0;
+      while (!queueForSysTable.isEmpty()) {
+        slowLogPayloads.add(queueForSysTable.poll());
+        i++;
+        if (i == SYSTABLE_PUT_BATCH_SIZE) {
+          SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, 
this.configuration);
+          slowLogPayloads.clear();
+          i = 0;
+        }
+      }
+      if (slowLogPayloads.size() > 0) {
+        SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, 
this.configuration);
+      }
+    } finally {
+      LOCK.unlock();
+    }
+  }
+
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RpcLogDetails.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RpcLogDetails.java
index 7d5558c..b469cdb 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RpcLogDetails.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RpcLogDetails.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver.slowlog;
 
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.hadoop.hbase.ipc.RpcCall;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -30,15 +31,17 @@ import org.apache.yetus.audience.InterfaceAudience;
 public class RpcLogDetails {
 
   private final RpcCall rpcCall;
+  private final Message param;
   private final String clientAddress;
   private final long responseSize;
   private final String className;
   private final boolean isSlowLog;
   private final boolean isLargeLog;
 
-  public RpcLogDetails(RpcCall rpcCall, String clientAddress, long 
responseSize,
+  public RpcLogDetails(RpcCall rpcCall, Message param, String clientAddress, 
long responseSize,
       String className, boolean isSlowLog, boolean isLargeLog) {
     this.rpcCall = rpcCall;
+    this.param = param;
     this.clientAddress = clientAddress;
     this.responseSize = responseSize;
     this.className = className;
@@ -70,10 +73,15 @@ public class RpcLogDetails {
     return isLargeLog;
   }
 
+  public Message getParam() {
+    return param;
+  }
+
   @Override
   public String toString() {
     return new ToStringBuilder(this)
       .append("rpcCall", rpcCall)
+      .append("param", param)
       .append("clientAddress", clientAddress)
       .append("responseSize", responseSize)
       .append("className", className)
@@ -81,5 +89,4 @@ public class RpcLogDetails {
       .append("isLargeLog", isLargeLog)
       .toString();
   }
-
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogRecorder.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogRecorder.java
index a69b0ad..b0fb3e7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogRecorder.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogRecorder.java
@@ -29,6 +29,7 @@ import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
@@ -86,7 +87,9 @@ public class SlowLogRecorder {
     this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler());
 
     // initialize ringbuffer event handler
-    this.logEventHandler = new LogEventHandler(this.eventCount);
+    final boolean isSlowLogTableEnabled = 
conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
+      HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
+    this.logEventHandler = new LogEventHandler(this.eventCount, 
isSlowLogTableEnabled, conf);
     this.disruptor.handleEventsWith(new 
LogEventHandler[]{this.logEventHandler});
     this.disruptor.start();
   }
@@ -161,4 +164,13 @@ public class SlowLogRecorder {
     }
   }
 
+  /**
+   * Poll from queueForSysTable and insert 100 records in hbase:slowlog table 
in single batch
+   */
+  public void addAllLogsToSysTable() {
+    if (this.logEventHandler != null) {
+      this.logEventHandler.addAllLogsToSysTable();
+    }
+  }
+
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogTableOpsChore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogTableOpsChore.java
new file mode 100644
index 0000000..77749f7
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogTableOpsChore.java
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.slowlog;
+
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Chore to insert multiple accumulated slow/large logs to hbase:slowlog 
system table
+ */
+@InterfaceAudience.Private
+public class SlowLogTableOpsChore extends ScheduledChore {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SlowLogTableOpsChore.class);
+
+  private final SlowLogRecorder slowLogRecorder;
+
+  /**
+   * Chore Constructor
+   *
+   * @param stopper The stopper - When {@link Stoppable#isStopped()} is true, 
this chore will
+   *   cancel and cleanup
+   * @param period Period in millis with which this Chore repeats execution 
when scheduled
+   * @param slowLogRecorder {@link SlowLogRecorder} instance
+   */
+  public SlowLogTableOpsChore(final Stoppable stopper, final int period,
+      final SlowLogRecorder slowLogRecorder) {
+    super("SlowLogTableOpsChore", stopper, period);
+    this.slowLogRecorder = slowLogRecorder;
+  }
+
+  @Override
+  protected void chore() {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("SlowLog Table Ops Chore is starting up.");
+    }
+    slowLogRecorder.addAllLogsToSysTable();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("SlowLog Table Ops Chore is closing.");
+    }
+  }
+
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogAccessor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogAccessor.java
new file mode 100644
index 0000000..e08ad29
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogAccessor.java
@@ -0,0 +1,204 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.slowlog;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests for SlowLog System Table
+ */
+@Category({ MasterTests.class, MediumTests.class })
+public class TestSlowLogAccessor {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestSlowLogAccessor.class);
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestSlowLogRecorder.class);
+
+  private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new 
HBaseTestingUtility();
+
+  private SlowLogRecorder slowLogRecorder;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    try {
+      HBASE_TESTING_UTILITY.shutdownMiniHBaseCluster();
+    } catch (IOException e) {
+      LOG.debug("No worries.");
+    }
+    Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
+    conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
+    conf.setBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY, true);
+    conf.setInt("hbase.slowlog.systable.chore.duration", 900);
+    conf.setInt("hbase.regionserver.slowlog.ringbuffer.size", 50000);
+    HBASE_TESTING_UTILITY.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void teardown() throws Exception {
+    HBASE_TESTING_UTILITY.shutdownMiniHBaseCluster();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    HRegionServer hRegionServer = 
HBASE_TESTING_UTILITY.getMiniHBaseCluster().getRegionServer(0);
+    Field slowLogRecorder = 
HRegionServer.class.getDeclaredField("slowLogRecorder");
+    slowLogRecorder.setAccessible(true);
+    this.slowLogRecorder = (SlowLogRecorder) 
slowLogRecorder.get(hRegionServer);
+  }
+
+  @Test
+  public void testSlowLogRecords() throws Exception {
+
+    AdminProtos.SlowLogResponseRequest request =
+      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build();
+
+    slowLogRecorder.clearSlowLogPayloads();
+    Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+
+    int i = 0;
+
+    Connection connection = waitForSlowLogTableCreation();
+    // add 5 records initially
+    for (; i < 5; i++) {
+      RpcLogDetails rpcLogDetails = TestSlowLogRecorder
+        .getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" 
+ (i + 1));
+      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+    }
+
+    // add 2 more records
+    for (; i < 7; i++) {
+      RpcLogDetails rpcLogDetails = TestSlowLogRecorder
+        .getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" 
+ (i + 1));
+      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+    }
+
+    // add 3 more records
+    for (; i < 10; i++) {
+      RpcLogDetails rpcLogDetails = TestSlowLogRecorder
+        .getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" 
+ (i + 1));
+      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+    }
+
+    // add 4 more records
+    for (; i < 14; i++) {
+      RpcLogDetails rpcLogDetails = TestSlowLogRecorder
+        .getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" 
+ (i + 1));
+      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+    }
+
+    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY
+      .waitFor(3000, () -> slowLogRecorder.getSlowLogPayloads(request).size() 
== 14));
+
+    Assert.assertNotEquals(-1,
+      HBASE_TESTING_UTILITY.waitFor(3000, () -> getTableCount(connection) == 
14));
+  }
+
+  private int getTableCount(Connection connection) {
+    try (Table table = 
connection.getTable(SlowLogTableAccessor.SLOW_LOG_TABLE_NAME)) {
+      ResultScanner resultScanner = table.getScanner(new 
Scan().setReadType(Scan.ReadType.STREAM));
+      int count = 0;
+      for (Result result : resultScanner) {
+        ++count;
+      }
+      return count;
+    } catch (Exception e) {
+      return 0;
+    }
+  }
+
+  private Connection waitForSlowLogTableCreation() {
+    Connection connection =
+      
HBASE_TESTING_UTILITY.getMiniHBaseCluster().getRegionServer(0).getConnection();
+    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(2000, () -> {
+      try {
+        return MetaTableAccessor.tableExists(connection, 
SlowLogTableAccessor.SLOW_LOG_TABLE_NAME);
+      } catch (IOException e) {
+        return false;
+      }
+    }));
+    return connection;
+  }
+
+  @Test
+  public void testHigherSlowLogs() throws Exception {
+    Connection connection = waitForSlowLogTableCreation();
+
+    slowLogRecorder.clearSlowLogPayloads();
+    AdminProtos.SlowLogResponseRequest request =
+      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build();
+    Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+
+    for (int j = 0; j < 100; j++) {
+      CompletableFuture.runAsync(() -> {
+        for (int i = 0; i < 350; i++) {
+          if (i == 300) {
+            Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+          }
+          RpcLogDetails rpcLogDetails = TestSlowLogRecorder
+            .getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), 
"class_" + (i + 1));
+          slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+        }
+      });
+    }
+
+    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(7000, () -> {
+      int count = slowLogRecorder.getSlowLogPayloads(request).size();
+      LOG.debug("RingBuffer records count: {}", count);
+      return count > 2000;
+    }));
+
+    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(7000, () -> {
+      int count = getTableCount(connection);
+      LOG.debug("SlowLog Table records count: {}", count);
+      return count > 2000;
+    }));
+  }
+
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogRecorder.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogRecorder.java
index bdd5c89..863e27b 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogRecorder.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogRecorder.java
@@ -486,18 +486,19 @@ public class TestSlowLogRecorder {
 
   }
 
-  private RpcLogDetails getRpcLogDetails(String userName, String clientAddress,
-      String className) {
-    return new RpcLogDetails(getRpcCall(userName), clientAddress, 0, 
className, true, true);
+  static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, 
String className) {
+    RpcCall rpcCall = getRpcCall(userName);
+    return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, 
className, true, true);
   }
 
   private RpcLogDetails getRpcLogDetails(String userName, String clientAddress,
       String className, boolean isSlowLog, boolean isLargeLog) {
-    return new RpcLogDetails(getRpcCall(userName), clientAddress, 0, 
className, isSlowLog,
+    RpcCall rpcCall = getRpcCall(userName);
+    return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, 
className, isSlowLog,
       isLargeLog);
   }
 
-  private RpcCall getRpcCall(String userName) {
+  private static RpcCall getRpcCall(String userName) {
     RpcCall rpcCall = new RpcCall() {
       @Override
       public BlockingService getService() {
@@ -646,7 +647,7 @@ public class TestSlowLogRecorder {
     return rpcCall;
   }
 
-  private Message getMessage() {
+  private static Message getMessage() {
 
     i = (i + 1) % 3;
 
@@ -693,7 +694,7 @@ public class TestSlowLogRecorder {
 
   }
 
-  private Optional<User> getUser(String userName) {
+  private static Optional<User> getUser(String userName) {
 
     return Optional.of(new User() {
 

Reply via email to