This is an automated email from the ASF dual-hosted git repository.
Apache9 pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new bec4dec35c4 HBASE-30183 Refactoring scanner related code in
hbase-thrift and hbase-rest (#8284) (#8303)
bec4dec35c4 is described below
commit bec4dec35c4034da9c3f3d0da2510914fe41b3fa
Author: Duo Zhang <[email protected]>
AuthorDate: Tue Jun 2 22:26:10 2026 +0800
HBASE-30183 Refactoring scanner related code in hbase-thrift and hbase-rest
(#8284) (#8303)
(cherry picked from commit 095de1d4421c15f411dec79f79a49cf605986336)
Signed-off-by: Xiao Liu <[email protected]>
---
.../hadoop/hbase/rest/ScannerInstanceResource.java | 46 +++++++--
.../apache/hadoop/hbase/rest/ScannerResource.java | 4 +-
.../hadoop/hbase/rest/TestSecureRESTServer.java | 88 ++++++++++++++++-
.../hadoop/hbase/thrift/HBaseServiceHandler.java | 90 +++++++++++++++++
.../hbase/thrift/ThriftHBaseServiceHandler.java | 110 +++++----------------
.../hbase/thrift2/ThriftHBaseServiceHandler.java | 73 ++++----------
.../hadoop/hbase/thrift/TestThriftServer.java | 8 +-
.../hbase/thrift/TestThriftSpnegoHttpServer.java | 42 +++++++-
.../TestThriftHBaseServiceHandlerWithLabels.java | 4 -
9 files changed, 299 insertions(+), 166 deletions(-)
diff --git
a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ScannerInstanceResource.java
b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ScannerInstanceResource.java
index 28ba60fa19b..c299b68b8b7 100644
---
a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ScannerInstanceResource.java
+++
b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ScannerInstanceResource.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.rest;
import java.io.IOException;
import java.util.Base64;
+import java.util.Objects;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableNotFoundException;
@@ -26,6 +27,7 @@ import org.apache.hadoop.hbase.rest.model.CellModel;
import org.apache.hadoop.hbase.rest.model.CellSetModel;
import org.apache.hadoop.hbase.rest.model.RowModel;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ConnectionCache;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,18 +55,31 @@ public class ScannerInstanceResource extends ResourceBase {
ResultGenerator generator = null;
String id = null;
+ String owner;
int batch = 1;
public ScannerInstanceResource() throws IOException {
}
- public ScannerInstanceResource(String table, String id, ResultGenerator
generator, int batch)
+ public ScannerInstanceResource(String id, String owner, ResultGenerator
generator, int batch)
throws IOException {
this.id = id;
+ this.owner = owner;
this.generator = generator;
this.batch = batch;
}
+ private Response checkOwner() {
+ ConnectionCache connCache = RESTServlet.getInstance().getConnectionCache();
+ if (!Objects.equals(connCache.getEffectiveUser(), owner)) {
+ LOG.warn("User {} is trying to access scanner {} which belongs to user
{}",
+ connCache.getEffectiveUser(), id, owner);
+ return Response.status(Response.Status.FORBIDDEN).type(MIMETYPE_TEXT)
+ .entity("Not allowed" + CRLF).build();
+ }
+ return null;
+ }
+
@GET
@Produces({ MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF,
MIMETYPE_PROTOBUF_IETF })
public Response get(final @Context UriInfo uriInfo, @QueryParam("n") int
maxRows,
@@ -77,10 +92,13 @@ public class ScannerInstanceResource extends ResourceBase {
servlet.getMetrics().incrementFailedGetRequests(1);
return Response.status(Response.Status.NOT_FOUND).type(MIMETYPE_TEXT)
.entity("Not found" + CRLF).build();
- } else {
- // Updated the connection access time for each client next() call
-
RESTServlet.getInstance().getConnectionCache().updateConnectionAccessTime();
}
+ Response checkResp = checkOwner();
+ if (checkResp != null) {
+ return checkResp;
+ }
+ // Updated the connection access time for each client next() call
+
RESTServlet.getInstance().getConnectionCache().updateConnectionAccessTime();
CellSetModel model = new CellSetModel();
RowModel rowModel = null;
byte[] rowKeyArray = null;
@@ -159,13 +177,18 @@ public class ScannerInstanceResource extends ResourceBase
{
if (LOG.isTraceEnabled()) {
LOG.trace("GET " + uriInfo.getAbsolutePath() + " as " + MIMETYPE_BINARY);
}
+
servlet.getMetrics().incrementRequests(1);
+ if (generator == null) {
+ servlet.getMetrics().incrementFailedGetRequests(1);
+ return Response.status(Response.Status.NOT_FOUND).type(MIMETYPE_TEXT)
+ .entity("Not found" + CRLF).build();
+ }
+ Response checkResp = checkOwner();
+ if (checkResp != null) {
+ return checkResp;
+ }
try {
- if (generator == null) {
- servlet.getMetrics().incrementFailedGetRequests(1);
- return Response.status(Response.Status.NOT_FOUND).type(MIMETYPE_TEXT)
- .entity("Not found" + CRLF).build();
- }
Cell value = generator.next();
if (value == null) {
if (LOG.isTraceEnabled()) {
@@ -199,6 +222,7 @@ public class ScannerInstanceResource extends ResourceBase {
if (LOG.isTraceEnabled()) {
LOG.trace("DELETE " + uriInfo.getAbsolutePath());
}
+
servlet.getMetrics().incrementRequests(1);
if (servlet.isReadOnly()) {
return Response.status(Response.Status.FORBIDDEN).type(MIMETYPE_TEXT)
@@ -209,6 +233,10 @@ public class ScannerInstanceResource extends ResourceBase {
return Response.status(Response.Status.NOT_FOUND).type(MIMETYPE_TEXT)
.entity("Not found" + CRLF).build();
}
+ Response checkResp = checkOwner();
+ if (checkResp != null) {
+ return checkResp;
+ }
if (ScannerResource.delete(id)) {
servlet.getMetrics().incrementSucessfulDeleteRequests(1);
} else {
diff --git
a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ScannerResource.java
b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ScannerResource.java
index 59b60982ef1..4878ca73e6d 100644
--- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ScannerResource.java
+++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ScannerResource.java
@@ -114,8 +114,8 @@ public class ScannerResource extends ResourceBase {
new ScannerResultGenerator(tableName, spec, filter, model.getCaching(),
model.getCacheBlocks(), model.isIncludeStartRow(),
model.isIncludeStopRow());
String id = gen.getID();
- ScannerInstanceResource instance =
- new ScannerInstanceResource(tableName, id, gen, model.getBatch());
+ ScannerInstanceResource instance = new ScannerInstanceResource(id,
+ RESTServlet.getInstance().getConnectionCache().getEffectiveUser(),
gen, model.getBatch());
scanners.put(id, instance);
if (LOG.isTraceEnabled()) {
LOG.trace("new scanner: " + id);
diff --git
a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestSecureRESTServer.java
b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestSecureRESTServer.java
index 38d9637794f..472343dcc7f 100644
---
a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestSecureRESTServer.java
+++
b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestSecureRESTServer.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.http.ssl.KeyStoreTestUtil;
import org.apache.hadoop.hbase.rest.model.CellModel;
import org.apache.hadoop.hbase.rest.model.CellSetModel;
import org.apache.hadoop.hbase.rest.model.RowModel;
+import org.apache.hadoop.hbase.rest.model.ScannerModel;
import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
import org.apache.hadoop.hbase.security.access.AccessControlClient;
import org.apache.hadoop.hbase.security.access.AccessControlConstants;
@@ -70,7 +71,9 @@ import org.apache.http.client.AuthCache;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.AuthSchemes;
import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.config.Registry;
@@ -110,6 +113,7 @@ public class TestSecureRESTServer {
private static final String HOSTNAME = "localhost";
private static final String CLIENT_PRINCIPAL = "client";
+ private static final String CLIENT_PRINCIPAL2 = "client2";
private static final String WHEEL_PRINCIPAL = "wheel";
// The principal for accepting SPNEGO authn'ed requests (*must* be HTTP/fqdn)
private static final String SPNEGO_SERVICE_PRINCIPAL = "HTTP/" + HOSTNAME;
@@ -156,7 +160,7 @@ public class TestSecureRESTServer {
* Start KDC
*/
KDC = TEST_UTIL.setupMiniKdc(serviceKeytab);
- KDC.createPrincipal(clientKeytab, CLIENT_PRINCIPAL);
+ KDC.createPrincipal(clientKeytab, CLIENT_PRINCIPAL, CLIENT_PRINCIPAL2);
KDC.createPrincipal(wheelKeytab, WHEEL_PRINCIPAL);
KDC.createPrincipal(serviceKeytab, SERVICE_PRINCIPAL);
// REST server's keytab contains keys for both principals REST uses
@@ -189,7 +193,7 @@ public class TestSecureRESTServer {
updateKerberosConfiguration(conf, REST_SERVER_PRINCIPAL,
SPNEGO_SERVICE_PRINCIPAL,
restServerKeytab);
- // Start HDFS
+ // Start HBase
TEST_UTIL.startMiniCluster(
StartMiniClusterOption.builder().numMasters(1).numRegionServers(1).numZkServers(1).build());
@@ -330,10 +334,10 @@ public class TestSecureRESTServer {
});
}
- public void testProxy(String extraArgs, String PRINCIPAL, File keytab, int
responseCode)
+ private void testProxy(String extraArgs, String PRINCIPAL, File keytab, int
responseCode)
throws Exception {
- UserGroupInformation superuser = UserGroupInformation
- .loginUserFromKeytabAndReturnUGI(SERVICE_PRINCIPAL,
serviceKeytab.getAbsolutePath());
+ UserGroupInformation.loginUserFromKeytabAndReturnUGI(SERVICE_PRINCIPAL,
+ serviceKeytab.getAbsolutePath());
final TableName table = TableName.valueOf("publicTable");
// Read that row as the client
@@ -417,6 +421,80 @@ public class TestSecureRESTServer {
});
}
+ @Test
+ public void testScanWithDifferentClients() throws Exception {
+ Pair<CloseableHttpClient, HttpClientContext> pair = getClient();
+ CloseableHttpClient client = pair.getFirst();
+ HttpClientContext context = pair.getSecond();
+
+ UserGroupInformation ugi = UserGroupInformation
+ .loginUserFromKeytabAndReturnUGI(CLIENT_PRINCIPAL,
clientKeytab.getAbsolutePath());
+
+ ObjectMapper mapper = new
JacksonJaxbJsonProvider().locateMapper(ScannerModel.class,
+ MediaType.APPLICATION_JSON_TYPE);
+ TableName table = TableName.valueOf("publicTable");
+ ScannerModel model = new ScannerModel();
+ StringEntity entity =
+ new StringEntity(mapper.writeValueAsString(model),
ContentType.APPLICATION_JSON);
+ HttpPost post =
+ new HttpPost("http://localhost:" + REST_TEST.getServletPort() + "/" +
table + "/scanner");
+ post.setEntity(entity);
+ String scannerURI = ugi.doAs(new PrivilegedExceptionAction<String>() {
+
+ @Override
+ public String run() throws Exception {
+ try (CloseableHttpResponse response = client.execute(post, context)) {
+ final int statusCode = response.getStatusLine().getStatusCode();
+ assertEquals(HttpURLConnection.HTTP_CREATED, statusCode);
+ return response.getFirstHeader("Location").getValue();
+ }
+ }
+ });
+
+ Pair<CloseableHttpClient, HttpClientContext> pair2 = getClient();
+ CloseableHttpClient client2 = pair2.getFirst();
+ HttpClientContext context2 = pair2.getSecond();
+
+ UserGroupInformation ugi2 = UserGroupInformation
+ .loginUserFromKeytabAndReturnUGI(CLIENT_PRINCIPAL2,
clientKeytab.getAbsolutePath());
+ ugi2.doAs(new PrivilegedExceptionAction<Void>() {
+
+ @Override
+ public Void run() throws Exception {
+ HttpGet get = new HttpGet(scannerURI + "?n=1");
+ try (CloseableHttpResponse response = client2.execute(get, context2)) {
+ final int statusCode = response.getStatusLine().getStatusCode();
+ assertEquals(HttpURLConnection.HTTP_FORBIDDEN, statusCode);
+ }
+ HttpDelete delete = new HttpDelete(scannerURI);
+ try (CloseableHttpResponse response = client2.execute(delete,
context2)) {
+ final int statusCode = response.getStatusLine().getStatusCode();
+ assertEquals(HttpURLConnection.HTTP_FORBIDDEN, statusCode);
+ }
+ return null;
+ }
+ });
+
+ ugi.doAs(new PrivilegedExceptionAction<Void>() {
+
+ @Override
+ public Void run() throws Exception {
+ HttpGet get = new HttpGet(scannerURI + "?n=1");
+ try (CloseableHttpResponse response = client.execute(get, context)) {
+ final int statusCode = response.getStatusLine().getStatusCode();
+ assertEquals(HttpURLConnection.HTTP_OK, statusCode);
+ }
+ HttpDelete delete = new HttpDelete(scannerURI);
+ try (CloseableHttpResponse response = client.execute(delete, context))
{
+ final int statusCode = response.getStatusLine().getStatusCode();
+ assertEquals(HttpURLConnection.HTTP_OK, statusCode);
+ }
+ return null;
+ }
+ });
+
+ }
+
private Pair<CloseableHttpClient, HttpClientContext> getClient() {
HttpClientConnectionManager pool = new
PoolingHttpClientConnectionManager();
HttpHost host = new HttpHost("localhost", REST_TEST.getServletPort());
diff --git
a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HBaseServiceHandler.java
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HBaseServiceHandler.java
index 79f5d4ee830..6b28c4c9554 100644
---
a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HBaseServiceHandler.java
+++
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HBaseServiceHandler.java
@@ -17,22 +17,41 @@
*/
package org.apache.hadoop.hbase.thrift;
+import static
org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
+import static
org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
+
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ConnectionCache;
+import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
+import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
+import org.apache.hbase.thirdparty.com.google.common.cache.RemovalCause;
/**
* abstract class for HBase handler providing a Connection cache and get
table/admin method
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
public abstract class HBaseServiceHandler {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HBaseServiceHandler.class);
+
public static final String CLEANUP_INTERVAL =
"hbase.thrift.connection.cleanup-interval";
public static final String MAX_IDLETIME =
"hbase.thrift.connection.max-idletime";
@@ -40,12 +59,38 @@ public abstract class HBaseServiceHandler {
protected final ConnectionCache connectionCache;
+ protected static final class ResultScannerWrapper {
+ public final ResultScanner scanner;
+ public final boolean sortColumns;
+ public final String owner;
+
+ public ResultScannerWrapper(ResultScanner scanner, boolean sortColumns,
String owner) {
+ this.scanner = scanner;
+ this.sortColumns = sortColumns;
+ this.owner = owner;
+ }
+ }
+
+ private final AtomicInteger nextScannerId = new AtomicInteger(0);
+ private final Cache<Integer, ResultScannerWrapper> scannerMap;
+ private final KeyLocker<Integer> removeScannerLock = new KeyLocker<>();
+
public HBaseServiceHandler(final Configuration c, final UserProvider
userProvider)
throws IOException {
this.conf = c;
int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
connectionCache = new ConnectionCache(conf, userProvider, cleanInterval,
maxIdleTime);
+ long cacheTimeout = conf.getLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
+ DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
+ scannerMap = CacheBuilder.newBuilder().expireAfterAccess(cacheTimeout,
TimeUnit.MILLISECONDS)
+ .removalListener(notification -> {
+ // do not close the scanner if it is removed manually, we will either
add it back or close
+ // it manually.
+ if (notification.getCause() != RemovalCause.EXPLICIT) {
+ ((ResultScannerWrapper) notification.getValue()).scanner.close();
+ }
+ }).build();
}
protected ThriftMetrics metrics = null;
@@ -58,6 +103,51 @@ public abstract class HBaseServiceHandler {
connectionCache.setEffectiveUser(effectiveUser);
}
+ /**
+ * Assigns a unique ID to the scanner and adds the mapping to an internal
HashMap.
+ * @param scanner to add
+ * @return Id for this Scanner
+ */
+ protected int addScanner(ResultScanner scanner, boolean sortColumns) {
+ int id = nextScannerId.getAndIncrement();
+ ResultScannerWrapper wrapper =
+ new ResultScannerWrapper(scanner, sortColumns,
connectionCache.getEffectiveUser());
+ scannerMap.put(id, wrapper);
+ return id;
+ }
+
+ /**
+ * Add the given scanner back to scanner map.
+ * <p>
+ * When scanning, we need to remove the scanner from scanner map to prevent
expiration during
+ * scanning.
+ */
+ protected void addScannerBack(int id, ResultScannerWrapper wrapper) {
+ scannerMap.put(id, wrapper);
+ }
+
+ /**
+ * Removes the scanner associated with the specified ID from the internal
HashMap.
+ * @param id of the Scanner to remove
+ * @throws AccessDeniedException if the scanner is not belong to the current
user
+ */
+ protected ResultScannerWrapper removeScanner(int id) throws IOException {
+ Lock lock = removeScannerLock.acquireLock(id);
+ try {
+ ResultScannerWrapper wrapper = scannerMap.getIfPresent(id);
+ if (wrapper != null &&
!Objects.equals(connectionCache.getEffectiveUser(), wrapper.owner)) {
+ LOG.warn("User {} is trying to access scanner id = {} where owner =
{}",
+ connectionCache.getEffectiveUser(), id, wrapper.owner);
+ throw new AccessDeniedException(
+ "User " + connectionCache.getEffectiveUser() + " is not allowed to
access scanner " + id);
+ }
+ scannerMap.invalidate(id);
+ return wrapper;
+ } finally {
+ lock.unlock();
+ }
+ }
+
/**
* Obtain HBaseAdmin. Creates the instance if it is not already created.
*/
diff --git
a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHBaseServiceHandler.java
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHBaseServiceHandler.java
index a0b174e8329..e074751ef85 100644
---
a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHBaseServiceHandler.java
+++
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHBaseServiceHandler.java
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hbase.thrift;
-import static
org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
-import static
org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
import static org.apache.hadoop.hbase.thrift.Constants.COALESCE_INC_KEY;
import static org.apache.hadoop.hbase.util.Bytes.getBytes;
@@ -30,7 +28,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilder;
@@ -86,8 +83,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
-import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
-import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
/**
* The HBaseServiceHandler is a glue object that connects Thrift RPC calls to
the HBase client API
@@ -100,9 +95,6 @@ public class ThriftHBaseServiceHandler extends
HBaseServiceHandler implements Hb
public static final int HREGION_VERSION = 1;
- // nextScannerId and scannerMap are used to manage scanner state
- private int nextScannerId = 0;
- private Cache<Integer, ResultScannerWrapper> scannerMap;
IncrementCoalescer coalescer;
/**
@@ -118,44 +110,9 @@ public class ThriftHBaseServiceHandler extends
HBaseServiceHandler implements Hb
return columns;
}
- /**
- * Assigns a unique ID to the scanner and adds the mapping to an internal
hash-map.
- * @param scanner the {@link ResultScanner} to add
- * @return integer scanner id
- */
- protected synchronized int addScanner(ResultScanner scanner, boolean
sortColumns) {
- int id = nextScannerId++;
- ResultScannerWrapper resultScannerWrapper = new
ResultScannerWrapper(scanner, sortColumns);
- scannerMap.put(id, resultScannerWrapper);
- return id;
- }
-
- /**
- * Returns the scanner associated with the specified ID.
- * @param id the ID of the scanner to get
- * @return a Scanner, or null if ID was invalid.
- */
- private synchronized ResultScannerWrapper getScanner(int id) {
- return scannerMap.getIfPresent(id);
- }
-
- /**
- * Removes the scanner associated with the specified ID from the internal
id->scanner hash-map.
- * @param id the ID of the scanner to remove
- */
- private synchronized void removeScanner(int id) {
- scannerMap.invalidate(id);
- }
-
protected ThriftHBaseServiceHandler(final Configuration c, final
UserProvider userProvider)
throws IOException {
super(c, userProvider);
- long cacheTimeout =
- c.getLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD)
- * 2;
-
- scannerMap =
- CacheBuilder.newBuilder().expireAfterAccess(cacheTimeout,
TimeUnit.MILLISECONDS).build();
this.coalescer = new IncrementCoalescer(this);
}
@@ -790,40 +747,50 @@ public class ThriftHBaseServiceHandler extends
HBaseServiceHandler implements Hb
@Override
public void scannerClose(int id) throws IOError, IllegalArgument {
LOG.debug("scannerClose: id={}", id);
- ResultScannerWrapper resultScannerWrapper = getScanner(id);
+ ResultScannerWrapper resultScannerWrapper;
+ try {
+ resultScannerWrapper = removeScanner(id);
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
+ }
if (resultScannerWrapper == null) {
LOG.warn("scanner ID is invalid");
throw new IllegalArgument("scanner ID is invalid");
}
- resultScannerWrapper.getScanner().close();
- removeScanner(id);
+ resultScannerWrapper.scanner.close();
}
@Override
public List<TRowResult> scannerGetList(int id, int nbRows) throws
IllegalArgument, IOError {
LOG.debug("scannerGetList: id={}", id);
- ResultScannerWrapper resultScannerWrapper = getScanner(id);
+ ResultScannerWrapper resultScannerWrapper;
+ try {
+ resultScannerWrapper = removeScanner(id);
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
+ }
if (null == resultScannerWrapper) {
String message = "scanner ID is invalid";
LOG.warn(message);
throw new IllegalArgument("scanner ID is invalid");
}
-
- Result[] results;
try {
- results = resultScannerWrapper.getScanner().next(nbRows);
- if (null == results) {
- return new ArrayList<>();
+ Result[] results;
+ try {
+ results = resultScannerWrapper.scanner.next(nbRows);
+ if (null == results) {
+ return new ArrayList<>();
+ }
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
}
- } catch (IOException e) {
- LOG.warn(e.getMessage(), e);
- throw getIOError(e);
+ return ThriftUtilities.rowResultFromHBase(results,
resultScannerWrapper.sortColumns);
} finally {
- // Add scanner back to scannerMap; protects against case
- // where scanner expired during processing of request.
- scannerMap.put(id, resultScannerWrapper);
+ addScannerBack(id, resultScannerWrapper);
}
- return ThriftUtilities.rowResultFromHBase(results,
resultScannerWrapper.isColumnSorted());
}
@Override
@@ -834,7 +801,6 @@ public class ThriftHBaseServiceHandler extends
HBaseServiceHandler implements Hb
@Override
public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
-
Table table = null;
try {
table = getTable(tableName);
@@ -887,7 +853,6 @@ public class ThriftHBaseServiceHandler extends
HBaseServiceHandler implements Hb
@Override
public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
List<ByteBuffer> columns,
Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
-
Table table = null;
try {
table = getTable(tableName);
@@ -915,7 +880,6 @@ public class ThriftHBaseServiceHandler extends
HBaseServiceHandler implements Hb
@Override
public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
ByteBuffer stopRow,
List<ByteBuffer> columns, Map<ByteBuffer, ByteBuffer> attributes) throws
IOError, TException {
-
Table table = null;
try {
table = getTable(tableName);
@@ -943,7 +907,6 @@ public class ThriftHBaseServiceHandler extends
HBaseServiceHandler implements Hb
@Override
public int scannerOpenWithPrefix(ByteBuffer tableName, ByteBuffer
startAndPrefix,
List<ByteBuffer> columns, Map<ByteBuffer, ByteBuffer> attributes) throws
IOError, TException {
-
Table table = null;
try {
table = getTable(tableName);
@@ -973,7 +936,6 @@ public class ThriftHBaseServiceHandler extends
HBaseServiceHandler implements Hb
@Override
public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
List<ByteBuffer> columns,
long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError,
TException {
-
Table table = null;
try {
table = getTable(tableName);
@@ -1003,7 +965,6 @@ public class ThriftHBaseServiceHandler extends
HBaseServiceHandler implements Hb
public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
ByteBuffer stopRow,
List<ByteBuffer> columns, long timestamp, Map<ByteBuffer, ByteBuffer>
attributes)
throws IOError, TException {
-
Table table = null;
try {
table = getTable(tableName);
@@ -1244,25 +1205,6 @@ public class ThriftHBaseServiceHandler extends
HBaseServiceHandler implements Hb
}
}
- protected static class ResultScannerWrapper {
-
- private final ResultScanner scanner;
- private final boolean sortColumns;
-
- public ResultScannerWrapper(ResultScanner resultScanner, boolean
sortResultColumns) {
- scanner = resultScanner;
- sortColumns = sortResultColumns;
- }
-
- public ResultScanner getScanner() {
- return scanner;
- }
-
- public boolean isColumnSorted() {
- return sortColumns;
- }
- }
-
public static class IOErrorWithCause extends IOError {
private final Throwable cause;
diff --git
a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java
index a5c9b9cf4aa..0c0ab11a4d7 100644
---
a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java
+++
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hbase.thrift2;
-import static
org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
-import static
org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_READONLY_ENABLED;
import static
org.apache.hadoop.hbase.thrift.Constants.THRIFT_READONLY_ENABLED_DEFAULT;
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.appendFromThrift;
@@ -52,8 +50,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -97,10 +93,6 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
-import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
-import org.apache.hbase.thirdparty.com.google.common.cache.RemovalListener;
-
/**
* This class is a glue object that connects Thrift RPC calls to the HBase
client API primarily
* defined in the Table interface.
@@ -112,10 +104,6 @@ public class ThriftHBaseServiceHandler extends
HBaseServiceHandler implements TH
// TODO: Size of pool configuraple
private static final Logger LOG =
LoggerFactory.getLogger(ThriftHBaseServiceHandler.class);
- // nextScannerId and scannerMap are used to manage scanner state
- private final AtomicInteger nextScannerId = new AtomicInteger(0);
- private final Cache<Integer, ResultScanner> scannerMap;
-
private static final IOException ioe =
new DoNotRetryIOException("Thrift Server is in Read-only mode.");
private boolean isReadOnly;
@@ -157,13 +145,7 @@ public class ThriftHBaseServiceHandler extends
HBaseServiceHandler implements TH
public ThriftHBaseServiceHandler(final Configuration conf, final
UserProvider userProvider)
throws IOException {
super(conf, userProvider);
- long cacheTimeout = conf.getLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
- DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
isReadOnly = conf.getBoolean(THRIFT_READONLY_ENABLED,
THRIFT_READONLY_ENABLED_DEFAULT);
- scannerMap = CacheBuilder.newBuilder().expireAfterAccess(cacheTimeout,
TimeUnit.MILLISECONDS)
- .removalListener((RemovalListener<Integer,
- ResultScanner>) removalNotification ->
removalNotification.getValue().close())
- .build();
}
@Override
@@ -198,34 +180,6 @@ public class ThriftHBaseServiceHandler extends
HBaseServiceHandler implements TH
return err;
}
- /**
- * Assigns a unique ID to the scanner and adds the mapping to an internal
HashMap.
- * @param scanner to add
- * @return Id for this Scanner
- */
- private int addScanner(ResultScanner scanner) {
- int id = nextScannerId.getAndIncrement();
- scannerMap.put(id, scanner);
- return id;
- }
-
- /**
- * Returns the Scanner associated with the specified Id.
- * @param id of the Scanner to get
- * @return a Scanner, or null if the Id is invalid
- */
- private ResultScanner getScanner(int id) {
- return scannerMap.getIfPresent(id);
- }
-
- /**
- * Removes the scanner associated with the specified ID from the internal
HashMap.
- * @param id of the Scanner to remove
- */
- protected void removeScanner(int id) {
- scannerMap.invalidate(id);
- }
-
@Override
public boolean exists(ByteBuffer table, TGet get) throws TIOError,
TException {
Table htable = getTable(table);
@@ -428,23 +382,30 @@ public class ThriftHBaseServiceHandler extends
HBaseServiceHandler implements TH
} finally {
closeTable(htable);
}
- return addScanner(resultScanner);
+ return addScanner(resultScanner, false);
}
@Override
public List<TResult> getScannerRows(int scannerId, int numRows)
throws TIOError, TIllegalArgument, TException {
- ResultScanner scanner = getScanner(scannerId);
- if (scanner == null) {
+ ResultScannerWrapper wrapper;
+ try {
+ wrapper = removeScanner(scannerId);
+ } catch (IOException e) {
+ throw getTIOError(e);
+ }
+ if (wrapper == null) {
TIllegalArgument ex = new TIllegalArgument();
ex.setMessage("Invalid scanner Id");
throw ex;
}
try {
connectionCache.updateConnectionAccessTime();
- return resultsFromHBase(scanner.next(numRows));
+ return resultsFromHBase(wrapper.scanner.next(numRows));
} catch (IOException e) {
throw getTIOError(e);
+ } finally {
+ addScannerBack(scannerId, wrapper);
}
}
@@ -471,15 +432,19 @@ public class ThriftHBaseServiceHandler extends
HBaseServiceHandler implements TH
@Override
public void closeScanner(int scannerId) throws TIOError, TIllegalArgument,
TException {
LOG.debug("scannerClose: id=" + scannerId);
- ResultScanner scanner = getScanner(scannerId);
- if (scanner == null) {
+ ResultScannerWrapper wrapper;
+ try {
+ wrapper = removeScanner(scannerId);
+ } catch (IOException e) {
+ throw getTIOError(e);
+ }
+ if (wrapper == null) {
LOG.warn("scanner ID: " + scannerId + "is invalid");
// While the scanner could be already expired,
// we should not throw exception here. Just log and return.
return;
}
- scanner.close();
- removeScanner(scannerId);
+ wrapper.scanner.close();
}
@Override
diff --git
a/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java
b/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java
index dd420e3745a..9e82ac86666 100644
---
a/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java
+++
b/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java
@@ -101,14 +101,14 @@ public class TestThriftServer {
}
// Static names for tables, columns, rows, and values
- private static ByteBuffer tableAname = asByteBuffer("tableA");
+ public static ByteBuffer tableAname = asByteBuffer("tableA");
private static ByteBuffer tableBname = asByteBuffer("tableB");
private static ByteBuffer columnAname = asByteBuffer("columnA:");
- private static ByteBuffer columnAAname = asByteBuffer("columnA:A");
+ public static ByteBuffer columnAAname = asByteBuffer("columnA:A");
private static ByteBuffer columnBname = asByteBuffer("columnB:");
- private static ByteBuffer rowAname = asByteBuffer("rowA");
+ public static ByteBuffer rowAname = asByteBuffer("rowA");
private static ByteBuffer rowBname = asByteBuffer("rowB");
- private static ByteBuffer valueAname = asByteBuffer("valueA");
+ public static ByteBuffer valueAname = asByteBuffer("valueA");
private static ByteBuffer valueBname = asByteBuffer("valueB");
private static ByteBuffer valueCname = asByteBuffer("valueC");
private static ByteBuffer valueDname = asByteBuffer("valueD");
diff --git
a/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftSpnegoHttpServer.java
b/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftSpnegoHttpServer.java
index 13653d02c42..3587872dbae 100644
---
a/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftSpnegoHttpServer.java
+++
b/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftSpnegoHttpServer.java
@@ -18,8 +18,10 @@
package org.apache.hadoop.hbase.thrift;
import static
org.apache.hadoop.hbase.thrift.Constants.THRIFT_SUPPORT_PROXYUSER_KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.File;
@@ -28,6 +30,8 @@ import java.nio.ByteBuffer;
import java.nio.file.Paths;
import java.security.Principal;
import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.Supplier;
@@ -40,6 +44,8 @@ import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.thrift.generated.Hbase;
+import org.apache.hadoop.hbase.thrift.generated.IOError;
+import org.apache.hadoop.hbase.thrift.generated.Mutation;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.SimpleKdcServerUtil;
import org.apache.hadoop.security.authentication.util.KerberosName;
@@ -87,6 +93,7 @@ public class TestThriftSpnegoHttpServer extends
TestThriftHttpServerBase {
private static File clientKeytab;
private static String clientPrincipal;
+ private static String clientPrincipal2;
private static String serverPrincipal;
private static String spnegoServerPrincipal;
@@ -116,8 +123,9 @@ public class TestThriftSpnegoHttpServer extends
TestThriftHttpServerBase {
assertTrue(keytabDir.mkdirs());
clientPrincipal = "client@" + kdc.getKdcConfig().getKdcRealm();
+ clientPrincipal2 = "client2@" + kdc.getKdcConfig().getKdcRealm();
clientKeytab = new File(keytabDir, clientPrincipal + ".keytab");
- kdc.createAndExportPrincipals(clientKeytab, clientPrincipal);
+ kdc.createAndExportPrincipals(clientKeytab, clientPrincipal,
clientPrincipal2);
String hostname = InetAddress.getLoopbackAddress().getHostName();
serverPrincipal = "hbase/" + hostname + "@" +
kdc.getKdcConfig().getKdcRealm();
@@ -167,11 +175,32 @@ public class TestThriftSpnegoHttpServer extends
TestThriftHttpServerBase {
super.testRunThriftServerWithHeaderBufferLength();
}
+ private void testScanWithDifferentClients(Hbase.Client client, Hbase.Client
client2)
+ throws Exception {
+ List<Mutation> mutations = new ArrayList<>(1);
+ mutations
+ .add(new Mutation(false, TestThriftServer.columnAAname,
TestThriftServer.valueAname, true));
+ client.mutateRow(TestThriftServer.tableAname, TestThriftServer.rowAname,
mutations,
+ Collections.emptyMap());
+
+ int id = client.scannerOpen(TestThriftServer.tableAname,
ByteBuffer.allocate(0),
+ Collections.emptyList(), Collections.emptyMap());
+
+ assertThrows(IOError.class, () ->
client2.scannerGet(id)).printStackTrace();
+ assertThrows(IOError.class, () ->
client2.scannerClose(id)).printStackTrace();
+
+ assertEquals(1, client.scannerGet(id).size());
+ assertEquals(0, client.scannerGet(id).size());
+ client.scannerClose(id);
+ }
+
@Override
protected void talkToThriftServer(String url, int customHeaderSize) throws
Exception {
// Close httpClient and THttpClient automatically on any failures
- try (CloseableHttpClient httpClient = createHttpClient();
- THttpClient tHttpClient = new THttpClient(url, httpClient)) {
+ try (CloseableHttpClient httpClient = createHttpClient(clientPrincipal);
+ THttpClient tHttpClient = new THttpClient(url, httpClient);
+ CloseableHttpClient httpClient2 = createHttpClient(clientPrincipal2);
+ THttpClient tHttpClient2 = new THttpClient(url, httpClient2)) {
tHttpClient.open();
if (customHeaderSize > 0) {
StringBuilder sb = new StringBuilder();
@@ -194,11 +223,16 @@ public class TestThriftSpnegoHttpServer extends
TestThriftHttpServerBase {
}
TestThriftServer.createTestTables(client);
TestThriftServer.checkTableList(client);
+
+ TProtocol prop2 = new TBinaryProtocol(tHttpClient2);
+ Hbase.Client client2 = new Hbase.Client(prop2);
+ testScanWithDifferentClients(client, client2);
+
TestThriftServer.dropTestTables(client);
}
}
- private CloseableHttpClient createHttpClient() throws Exception {
+ private CloseableHttpClient createHttpClient(String clientPrincipal) throws
Exception {
final Subject clientSubject =
JaasKrbUtil.loginUsingKeytab(clientPrincipal, clientKeytab);
final Set<Principal> clientPrincipals = clientSubject.getPrincipals();
// Make sure the subject has a principal
diff --git
a/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandlerWithLabels.java
b/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandlerWithLabels.java
index e1bef3367ab..e77708b926b 100644
---
a/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandlerWithLabels.java
+++
b/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandlerWithLabels.java
@@ -69,15 +69,11 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
@Tag(ClientTests.TAG)
@Tag(MediumTests.TAG)
public class TestThriftHBaseServiceHandlerWithLabels {
- private static final Logger LOG =
- LoggerFactory.getLogger(TestThriftHBaseServiceHandlerWithLabels.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
// Static names for tables, columns, rows, and values