This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new d88a87f HBASE-21909 Validate the put instance before executing in
AsyncTable.put method
d88a87f is described below
commit d88a87fd0406430604ce852c5a1ab9c9ad66c3aa
Author: zhangduo <[email protected]>
AuthorDate: Fri Feb 15 20:57:56 2019 +0800
HBASE-21909 Validate the put instance before executing in AsyncTable.put
method
Signed-off-by: Michael Stack <[email protected]>
---
.../hbase/client/AsyncBufferedMutatorBuilder.java | 7 +++++++
.../client/AsyncBufferedMutatorBuilderImpl.java | 15 +++++++++++++--
.../hbase/client/AsyncBufferedMutatorImpl.java | 14 ++++++++++++--
.../hbase/client/AsyncConnectionConfiguration.java | 9 +++++++++
.../hadoop/hbase/client/BufferedMutatorImpl.java | 2 +-
.../hadoop/hbase/client/ConnectionUtils.java | 16 ++++++++++++++++
.../org/apache/hadoop/hbase/client/HTable.java | 22 +++-------------------
.../hadoop/hbase/client/HTableMultiplexer.java | 4 ++--
.../hadoop/hbase/client/RawAsyncTableImpl.java | 6 ++++++
.../hbase/client/TestAsyncBufferMutator.java | 6 +++---
10 files changed, 72 insertions(+), 29 deletions(-)
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
index c617c8e..ea2528d 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
@@ -98,6 +98,13 @@ public interface AsyncBufferedMutatorBuilder {
AsyncBufferedMutatorBuilder setWriteBufferSize(long writeBufferSize);
/**
+ * Override the maximum key-value size specified by the provided {@link
AsyncConnection}'s
+ * {@link org.apache.hadoop.conf.Configuration} instance, via the
configuration key
+ * {@code hbase.client.keyvalue.maxsize}.
+ */
+ AsyncBufferedMutatorBuilder setMaxKeyValueSize(int maxKeyValueSize);
+
+ /**
* Create the {@link AsyncBufferedMutator} instance.
*/
AsyncBufferedMutator build();
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
index eb8af17..cd04963 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
@@ -37,11 +37,14 @@ class AsyncBufferedMutatorBuilderImpl implements
AsyncBufferedMutatorBuilder {
private long periodicFlushTimeoutNs;
+ private int maxKeyValueSize;
+
public AsyncBufferedMutatorBuilderImpl(AsyncConnectionConfiguration connConf,
AsyncTableBuilder<?> tableBuilder, HashedWheelTimer
periodicalFlushTimer) {
this.tableBuilder = tableBuilder;
this.writeBufferSize = connConf.getWriteBufferSize();
this.periodicFlushTimeoutNs =
connConf.getWriteBufferPeriodicFlushTimeoutNs();
+ this.maxKeyValueSize = connConf.getMaxKeyValueSize();
this.periodicalFlushTimer = periodicalFlushTimer;
}
@@ -77,7 +80,7 @@ class AsyncBufferedMutatorBuilderImpl implements
AsyncBufferedMutatorBuilder {
@Override
public AsyncBufferedMutatorBuilder setWriteBufferSize(long writeBufferSize) {
- Preconditions.checkArgument(writeBufferSize > 0, "writeBufferSize %d must
be >= 0",
+ Preconditions.checkArgument(writeBufferSize > 0, "writeBufferSize %d must
be > 0",
writeBufferSize);
this.writeBufferSize = writeBufferSize;
return this;
@@ -90,8 +93,16 @@ class AsyncBufferedMutatorBuilderImpl implements
AsyncBufferedMutatorBuilder {
}
@Override
+ public AsyncBufferedMutatorBuilder setMaxKeyValueSize(int maxKeyValueSize) {
+ Preconditions.checkArgument(maxKeyValueSize > 0, "maxKeyValueSize %d must
be > 0",
+ maxKeyValueSize);
+ this.maxKeyValueSize = maxKeyValueSize;
+ return this;
+ }
+
+ @Override
public AsyncBufferedMutator build() {
return new AsyncBufferedMutatorImpl(periodicalFlushTimer,
tableBuilder.build(), writeBufferSize,
- periodicFlushTimeoutNs);
+ periodicFlushTimeoutNs, maxKeyValueSize);
}
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
index 61d49af..7aa9597 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException;
@@ -49,6 +50,8 @@ class AsyncBufferedMutatorImpl implements
AsyncBufferedMutator {
private final long periodicFlushTimeoutNs;
+ private final int maxKeyValueSize;
+
private List<Mutation> mutations = new ArrayList<>();
private List<CompletableFuture<Void>> futures = new ArrayList<>();
@@ -61,11 +64,12 @@ class AsyncBufferedMutatorImpl implements
AsyncBufferedMutator {
Timeout periodicFlushTask;
AsyncBufferedMutatorImpl(HashedWheelTimer periodicalFlushTimer,
AsyncTable<?> table,
- long writeBufferSize, long periodicFlushTimeoutNs) {
+ long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize) {
this.periodicalFlushTimer = periodicalFlushTimer;
this.table = table;
this.writeBufferSize = writeBufferSize;
this.periodicFlushTimeoutNs = periodicFlushTimeoutNs;
+ this.maxKeyValueSize = maxKeyValueSize;
}
@Override
@@ -112,7 +116,13 @@ class AsyncBufferedMutatorImpl implements
AsyncBufferedMutator {
List<CompletableFuture<Void>> futures =
Stream.<CompletableFuture<Void>>
generate(CompletableFuture::new).limit(mutations.size())
.collect(Collectors.toList());
- long heapSize = mutations.stream().mapToLong(m -> m.heapSize()).sum();
+ long heapSize = 0;
+ for (Mutation mutation : mutations) {
+ heapSize += mutation.heapSize();
+ if (mutation instanceof Put) {
+ validatePut((Put)mutation, maxKeyValueSize);
+ }
+ }
synchronized (this) {
if (closed) {
IOException ioe = new IOException("Already closed");
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
index 65542e4..22042c9 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
@@ -40,6 +40,8 @@ import static
org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
import static
org.apache.hadoop.hbase.client.AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT;
import static
org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
+import static
org.apache.hadoop.hbase.client.ConnectionConfiguration.MAX_KEYVALUE_SIZE_DEFAULT;
+import static
org.apache.hadoop.hbase.client.ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY;
import static
org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND;
import static
org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT;
import static
org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND;
@@ -106,6 +108,8 @@ class AsyncConnectionConfiguration {
private final long primaryMetaScanTimeoutNs;
+ private final int maxKeyValueSize;
+
AsyncConnectionConfiguration(Configuration conf) {
this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT,
DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
@@ -142,6 +146,7 @@ class AsyncConnectionConfiguration {
this.primaryMetaScanTimeoutNs =
TimeUnit.MICROSECONDS.toNanos(conf.getLong(HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT,
HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT));
+ this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY,
MAX_KEYVALUE_SIZE_DEFAULT);
}
long getMetaOperationTimeoutNs() {
@@ -211,4 +216,8 @@ class AsyncConnectionConfiguration {
long getPrimaryMetaScanTimeoutNs() {
return primaryMetaScanTimeoutNs;
}
+
+ int getMaxKeyValueSize() {
+ return maxKeyValueSize;
+ }
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index d4bc811..f0c8da4 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -188,7 +188,7 @@ public class BufferedMutatorImpl implements BufferedMutator
{
int toAddCount = 0;
for (Mutation m : ms) {
if (m instanceof Put) {
- HTable.validatePut((Put) m, maxKeyValueSize);
+ ConnectionUtils.validatePut((Put) m, maxKeyValueSize);
}
toAddSize += m.heapSize();
++toAddCount;
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 8e050df..3b6560f 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -572,4 +572,20 @@ public final class ConnectionUtils {
});
return future;
}
+
+ // validate for well-formedness
+ static void validatePut(Put put, int maxKeyValueSize) throws
IllegalArgumentException {
+ if (put.isEmpty()) {
+ throw new IllegalArgumentException("No columns to insert");
+ }
+ if (maxKeyValueSize > 0) {
+ for (List<Cell> list : put.getFamilyCellMap().values()) {
+ for (Cell cell : list) {
+ if (cell.getSerializedSize() > maxKeyValueSize) {
+ throw new IllegalArgumentException("KeyValue size too large");
+ }
+ }
+ }
+ }
+ }
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 15a189c..9b3afd9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -27,7 +27,6 @@ import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -920,24 +919,8 @@ public class HTable implements Table {
}
// validate for well-formedness
- public void validatePut(final Put put) throws IllegalArgumentException {
- validatePut(put, connConfiguration.getMaxKeyValueSize());
- }
-
- // validate for well-formedness
- public static void validatePut(Put put, int maxKeyValueSize) throws
IllegalArgumentException {
- if (put.isEmpty()) {
- throw new IllegalArgumentException("No columns to insert");
- }
- if (maxKeyValueSize > 0) {
- for (List<Cell> list : put.getFamilyCellMap().values()) {
- for (Cell cell : list) {
- if (cell.getSerializedSize() > maxKeyValueSize) {
- throw new IllegalArgumentException("KeyValue size too large");
- }
- }
- }
- }
+ private void validatePut(final Put put) throws IllegalArgumentException {
+ ConnectionUtils.validatePut(put, connConfiguration.getMaxKeyValueSize());
}
/**
@@ -1261,6 +1244,7 @@ public class HTable implements Table {
@Override
public boolean thenPut(Put put) throws IOException {
+ validatePut(put);
preCheck();
return doCheckAndPut(row, family, qualifier, op.name(), value,
timeRange, put);
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
index c2edb89..56c551f 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
@@ -34,7 +34,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
@@ -46,6 +45,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -195,7 +195,7 @@ public class HTableMultiplexer {
}
try {
- HTable.validatePut(put, maxKeyValueSize);
+ ConnectionUtils.validatePut(put, maxKeyValueSize);
// Allow mocking to get at the connection, but don't expose the
connection to users.
ClusterConnection conn = (ClusterConnection) getConnection();
// AsyncProcess in the FlushWorker should take care of refreshing the
location cache
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 7562e6f..96fa85d 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -21,6 +21,7 @@ import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
import static
org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import com.google.protobuf.RpcChannel;
@@ -235,6 +236,7 @@ class RawAsyncTableImpl implements
AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<Void> put(Put put) {
+ validatePut(put, conn.connConf.getMaxKeyValueSize());
return this.<Void> newCaller(put, writeRpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Put>
voidMutate(controller, loc, stub,
put, RequestConverter::buildMutateRequest))
@@ -326,6 +328,7 @@ class RawAsyncTableImpl implements
AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<Boolean> thenPut(Put put) {
+ validatePut(put, conn.connConf.getMaxKeyValueSize());
preCheck();
return RawAsyncTableImpl.this.<Boolean> newCaller(row, rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean>
mutate(controller, loc,
@@ -478,6 +481,9 @@ class RawAsyncTableImpl implements
AsyncTable<AdvancedScanResultConsumer> {
@Override
public List<CompletableFuture<Void>> put(List<Put> puts) {
+ for (Put put : puts) {
+ validatePut(put, conn.connConf.getMaxKeyValueSize());
+ }
return voidMutate(puts);
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
index 6eed326..5e7f6cc 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
@@ -245,8 +245,8 @@ public class TestAsyncBufferMutator {
private int flushCount;
AsyncBufferMutatorForTest(HashedWheelTimer periodicalFlushTimer,
AsyncTable<?> table,
- long writeBufferSize, long periodicFlushTimeoutNs) {
- super(periodicalFlushTimer, table, writeBufferSize,
periodicFlushTimeoutNs);
+ long writeBufferSize, long periodicFlushTimeoutNs, int
maxKeyValueSize) {
+ super(periodicalFlushTimer, table, writeBufferSize,
periodicFlushTimeoutNs, maxKeyValueSize);
}
@Override
@@ -262,7 +262,7 @@ public class TestAsyncBufferMutator {
Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
try (AsyncBufferMutatorForTest mutator =
new AsyncBufferMutatorForTest(AsyncConnectionImpl.RETRY_TIMER,
CONN.getTable(TABLE_NAME),
- 10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200))) {
+ 10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200), 1024 * 1024))
{
CompletableFuture<?> future = mutator.mutate(put);
Timeout task = mutator.periodicFlushTask;
// we should have scheduled a periodic flush task