saintstack commented on a change in pull request #2127:
URL: https://github.com/apache/hbase/pull/2127#discussion_r459587451
##########
File path: hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
##########
@@ -1583,6 +1583,16 @@
"hbase.regionserver.slowlog.systable.enabled";
public static final boolean DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY = false;
+ /**
+ * Number of rows in a batch operation above which a warning will be logged.
+ */
+ public static final String BATCH_ROWS_THRESHOLD_NAME =
"hbase.rpc.rows.warning.threshold";
+
+ /**
+ * Default value of {@link #BATCH_ROWS_THRESHOLD_NAME}
+ */
+ public static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;
+
Review comment:
Why the move? HConstants is often considered an anti-pattern; better to
have constants beside where they are used?
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
##########
@@ -93,15 +94,21 @@
private SourceFSConfigurationProvider provider;
private WALEntrySinkFilter walEntrySinkFilter;
+ /**
+ * Row size threshold for multi requests above which a warning is logged
+ */
+ private final int rowSizeWarnThreshold;
+
/**
* Create a sink for replication
* @param conf conf object
- * @param stopper boolean to tell this thread to stop
* @throws IOException thrown when HDFS goes bad or bad file name
*/
- public ReplicationSink(Configuration conf, Stoppable stopper)
+ public ReplicationSink(Configuration conf)
Review comment:
Is the Stoppable unused? Usually its a chain for a Service to pull on
when it meets a condition it can't deal with... one that is so bad it wants to
stop the process.
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
##########
@@ -403,13 +398,24 @@ public void stopReplicationSinkServices() {
* Do the changes and handle the pool
* @param tableName table to insert into
* @param allRows list of actions
+ * @param batchRowSizeThreshold rowSize threshold for batch mutation
*/
- private void batch(TableName tableName, Collection<List<Row>> allRows)
throws IOException {
+ private void batch(TableName tableName, Collection<List<Row>> allRows, int
batchRowSizeThreshold)
+ throws IOException {
if (allRows.isEmpty()) {
return;
}
AsyncTable<?> table = getConnection().getTable(tableName);
- List<Future<?>> futures =
allRows.stream().map(table::batchAll).collect(Collectors.toList());
+ List<Future<?>> futures = new ArrayList<>();
+ for (List<Row> rows : allRows) {
+ List<List<Row>> batchRows;
+ if (rows.size() > batchRowSizeThreshold) {
+ batchRows = Lists.partition(rows, batchRowSizeThreshold);
+ } else {
+ batchRows = Collections.singletonList(rows);
+ }
+
futures.addAll(batchRows.stream().map(table::batchAll).collect(Collectors.toList()));
+ }
Review comment:
For the edits that are beyond the limit, they get handled in next batch?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]