This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new b38a2dbf65 Added ScanServer property for allowed tables (#6146)
b38a2dbf65 is described below

commit b38a2dbf656e83589eaffa64c89eafa47d242c2d
Author: Dave Marion <[email protected]>
AuthorDate: Wed Feb 25 07:33:49 2026 -0500

    Added ScanServer property for allowed tables (#6146)
    
    Added property that allows user to configure which tables
    are allowed to be scanned by clients at the ScanServer
    group level. Property defaults to allowing all non-accumulo
    namespace tables.
    
    Closes #6123
---
 .../org/apache/accumulo/core/conf/Property.java    |   7 +
 .../org/apache/accumulo/tserver/ScanServer.java    | 133 +++++++++-
 .../accumulo/tserver/ThriftScanClientHandler.java  |   2 +-
 .../apache/accumulo/tserver/ScanServerTest.java    |  81 ++++--
 .../accumulo/test/ScanServerAllowedTablesIT.java   | 292 +++++++++++++++++++++
 5 files changed, 480 insertions(+), 35 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 17edff8d2e..90602ad616 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -561,6 +561,13 @@ public enum Property {
       "The amount of time a scan reference is unused before its deleted from 
metadata table.",
       "2.1.0"),
   @Experimental
+  SSERV_SCAN_ALLOWED_TABLES("sserver.scan.allowed.tables.group.", null, 
PropertyType.PREFIX,
+      "A regular expression that determines which tables are allowed to be 
scanned for"
+          + " servers in the specified group. The property name should end 
with the scan server"
+          + " group and the property value should take into account the table 
namespace and name."
+          + " The default value disallows scans on tables in the accumulo 
namespace.",
+      "2.1.5"),
+  @Experimental
   SSERV_THREADCHECK("sserver.server.threadcheck.time", "1s", 
PropertyType.TIMEDURATION,
       "The time between adjustments of the thrift server thread pool.", 
"2.1.0"),
   // properties that are specific to tablet server behavior
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index 83f57347b1..53b121f08d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -19,6 +19,7 @@
 package org.apache.accumulo.tserver;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
 import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.SCAN_SERVER_TABLET_METADATA_CACHE_POOL;
 
@@ -45,6 +46,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -54,6 +58,7 @@ import 
org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.conf.cluster.ClusterConfigParser;
+import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.dataImpl.thrift.InitialMultiScan;
 import org.apache.accumulo.core.dataImpl.thrift.InitialScan;
@@ -88,6 +93,7 @@ import 
org.apache.accumulo.core.tabletserver.thrift.TabletScanClientService;
 import org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException;
 import org.apache.accumulo.core.trace.thrift.TInfo;
 import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.core.util.Retry;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.server.AbstractServer;
@@ -111,6 +117,7 @@ import 
org.apache.accumulo.tserver.session.SingleScanSession;
 import org.apache.accumulo.tserver.tablet.SnapshotTablet;
 import org.apache.accumulo.tserver.tablet.Tablet;
 import org.apache.accumulo.tserver.tablet.TabletBase;
+import org.apache.thrift.TApplicationException;
 import org.apache.thrift.TException;
 import org.apache.thrift.TProcessor;
 import org.apache.zookeeper.KeeperException;
@@ -177,6 +184,8 @@ public class ScanServer extends AbstractServer
   }
 
   private static final Logger LOG = LoggerFactory.getLogger(ScanServer.class);
+  // Default pattern to allow scans on all tables not in accumulo namespace
+  private static final String DEFAULT_SCAN_ALLOWED_PATTERN = 
"^(?!accumulo\\.).*$";
 
   protected ThriftScanClientHandler delegate;
   private UUID serverLockUUID;
@@ -213,6 +222,9 @@ public class ScanServer extends AbstractServer
 
   private final String groupName;
 
+  private final ConcurrentHashMap<TableId,Boolean> allowedTables = new 
ConcurrentHashMap<>();
+  private volatile String currentAllowedTableRegex;
+
   public ScanServer(ScanServerOpts opts, String[] args) {
     super("sserver", opts, args);
 
@@ -388,6 +400,7 @@ public class ScanServer extends AbstractServer
     }
 
     SecurityUtil.serverLogin(getConfiguration());
+    updateAllowedTables(false);
 
     ServerAddress address = null;
     try {
@@ -423,6 +436,7 @@ public class ScanServer extends AbstractServer
           Thread.sleep(1000);
           updateIdleStatus(sessionManager.getActiveScans().isEmpty()
               && tabletMetadataCache.estimatedSize() == 0);
+          updateAllowedTables(false);
         } catch (InterruptedException e) {
           LOG.info("Interrupt Exception received, shutting down");
           gracefulShutdown(getContext().rpcCreds());
@@ -477,6 +491,106 @@ public class ScanServer extends AbstractServer
     }
   }
 
+  // Visible for testing
+  protected boolean isAllowed(TCredentials credentials, TableId tid)
+      throws ThriftSecurityException {
+    Boolean result = allowedTables.get(tid);
+    if (result == null) {
+
+      final Retry retry =
+          Retry.builder().maxRetries(10).retryAfter(1, SECONDS).incrementBy(0, 
SECONDS)
+              .maxWait(2, SECONDS).backOffFactor(1.0).logInterval(3, 
SECONDS).createRetry();
+
+      while (result == null && retry.canRetry()) {
+        try {
+          retry.waitForNextAttempt(LOG,
+              "Allowed tables mapping does not contain an entry for table: " + 
tid
+                  + ", refreshing table...");
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          LOG.error("Interrupted while waiting for next retry", e);
+          break;
+        }
+        // Clear the cache and try again, maybe there
+        // is a race condition in table creation and scan
+        updateAllowedTables(true);
+        // validate that the table exists, else throw
+        delegate.getNamespaceId(credentials, tid);
+        result = allowedTables.get(tid);
+        retry.useRetry();
+      }
+
+      if (result == null) {
+        // Ran out of retries
+        throw new IllegalStateException(
+            "Unable to get allowed table mapping for table: " + tid + " within 
10s");
+      }
+    }
+    return result;
+  }
+
+  private synchronized void updateAllowedTables(boolean clearCache) {
+
+    LOG.trace("Updating allowed tables for ScanServer");
+    if (clearCache) {
+      context.clearTableListCache();
+    }
+
+    // Remove tables that no longer exist
+    allowedTables.keySet().forEach(tid -> {
+      if (!getContext().getTableIdToNameMap().containsKey(tid)) {
+        LOG.trace("Removing table {} from allowed table map as it no longer 
exists", tid);
+        allowedTables.remove(tid);
+      }
+    });
+
+    final String propName = Property.SSERV_SCAN_ALLOWED_TABLES.getKey() + 
groupName;
+    String allowedTableRegex = getConfiguration().get(propName);
+    if (allowedTableRegex == null) {
+      allowedTableRegex = DEFAULT_SCAN_ALLOWED_PATTERN;
+    }
+
+    if (currentAllowedTableRegex == null) {
+      LOG.trace("Property {} initial value: {}", propName, allowedTableRegex);
+    } else if (currentAllowedTableRegex.equals(allowedTableRegex)) {
+      // Property value has not changed, do nothing
+    } else {
+      LOG.info("Property {} has changed. Old value: {}, new value: {}", 
propName,
+          currentAllowedTableRegex, allowedTableRegex);
+    }
+
+    Pattern allowedTablePattern;
+    try {
+      allowedTablePattern = Pattern.compile(allowedTableRegex);
+      // Regex is valid, store it
+      currentAllowedTableRegex = allowedTableRegex;
+    } catch (PatternSyntaxException e) {
+      LOG.error(
+          "Property {} contains an invalid regular expression. Property value: 
{}. Disabling all tables.",
+          propName, allowedTableRegex);
+      allowedTablePattern = null;
+    }
+
+    Pattern p = allowedTablePattern;
+    context.getTableNameToIdMap().entrySet().forEach(e -> {
+      String tname = e.getKey();
+      TableId tid = e.getValue();
+      if (p == null) {
+        allowedTables.put(tid, Boolean.FALSE);
+      } else {
+        Matcher m = p.matcher(tname);
+        if (m.matches()) {
+          LOG.trace("Table {} can now be scanned via this ScanServer", tname);
+          allowedTables.put(tid, Boolean.TRUE);
+        } else {
+          LOG.trace("Table {} cannot be scanned via this ScanServer", tname);
+          allowedTables.put(tid, Boolean.FALSE);
+        }
+      }
+    });
+
+  }
+
   @SuppressWarnings("unchecked")
   private Map<KeyExtent,TabletMetadata> 
getTabletMetadata(Collection<KeyExtent> extents) {
     if (tabletMetadataCache == null) {
@@ -945,11 +1059,6 @@ public class ScanServer extends AbstractServer
     };
   }
 
-  /* Exposed for testing */
-  protected boolean isSystemUser(TCredentials creds) {
-    return context.getSecurityOperation().isSystemUser(creds);
-  }
-
   @Override
   public InitialScan startScan(TInfo tinfo, TCredentials credentials, 
TKeyExtent textent,
       TRange range, List<TColumn> columns, int batchSize, List<IterInfo> 
ssiList,
@@ -966,9 +1075,10 @@ public class ScanServer extends AbstractServer
 
     KeyExtent extent = getKeyExtent(textent);
 
-    if (extent.isMeta() && !isSystemUser(credentials)) {
-      throw new TException(
-          "Only the system user can perform eventual consistency scans on the 
root and metadata tables");
+    if (!isAllowed(credentials, extent.tableId())) {
+      throw new TApplicationException(TApplicationException.INTERNAL_ERROR,
+          "Scan of table " + extent.tableId() + " disallowed by property: "
+              + Property.SSERV_SCAN_ALLOWED_TABLES.getKey() + this.groupName);
     }
 
     try (ScanReservation reservation =
@@ -1038,9 +1148,10 @@ public class ScanServer extends AbstractServer
     for (Entry<TKeyExtent,List<TRange>> entry : tbatch.entrySet()) {
       KeyExtent extent = getKeyExtent(entry.getKey());
 
-      if (extent.isMeta() && 
!context.getSecurityOperation().isSystemUser(credentials)) {
-        throw new TException(
-            "Only the system user can perform eventual consistency scans on 
the root and metadata tables");
+      if (!isAllowed(credentials, extent.tableId())) {
+        throw new TApplicationException(TApplicationException.INTERNAL_ERROR,
+            "Scan of table " + extent.tableId() + " disallowed by property: "
+                + Property.SSERV_SCAN_ALLOWED_TABLES.getKey() + 
this.groupName);
       }
 
       batch.put(extent, entry.getValue());
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java
index b254fd3e40..e0a54c8374 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java
@@ -109,7 +109,7 @@ public class ThriftScanClientHandler implements 
TabletScanClientService.Iface {
         .getTimeInMillis(Property.TSERV_SCAN_RESULTS_MAX_TIMEOUT);
   }
 
-  private NamespaceId getNamespaceId(TCredentials credentials, TableId tableId)
+  public NamespaceId getNamespaceId(TCredentials credentials, TableId tableId)
       throws ThriftSecurityException {
     try {
       return server.getContext().getNamespaceId(tableId);
diff --git 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java
index 5a08040872..968f75e1b2 100644
--- 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java
+++ 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java
@@ -24,7 +24,9 @@ import static org.easymock.EasyMock.partialMockBuilder;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -32,8 +34,11 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Pattern;
 
 import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.dataImpl.thrift.InitialMultiScan;
 import org.apache.accumulo.core.dataImpl.thrift.InitialScan;
@@ -43,6 +48,7 @@ import org.apache.accumulo.core.dataImpl.thrift.ScanResult;
 import org.apache.accumulo.core.dataImpl.thrift.TColumn;
 import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
 import org.apache.accumulo.core.dataImpl.thrift.TRange;
+import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.process.thrift.ServerProcessService;
 import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
 import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
@@ -55,6 +61,7 @@ import 
org.apache.accumulo.tserver.session.ScanSession.TabletResolver;
 import org.apache.accumulo.tserver.tablet.SnapshotTablet;
 import org.apache.accumulo.tserver.tablet.Tablet;
 import org.apache.accumulo.tserver.tablet.TabletBase;
+import org.apache.thrift.TApplicationException;
 import org.apache.thrift.TException;
 import org.junit.jupiter.api.Test;
 
@@ -65,7 +72,7 @@ public class ScanServerTest {
     private KeyExtent extent;
     private TabletResolver resolver;
     private ScanReservation reservation;
-    private boolean systemUser;
+    private ConcurrentHashMap<TableId,TableId> allowedTables;
 
     protected TestScanServer(ScanServerOpts opts, String[] args) {
       super(opts, args);
@@ -114,13 +121,17 @@ public class ScanServerTest {
     }
 
     @Override
-    protected boolean isSystemUser(TCredentials creds) {
-      return systemUser;
+    public boolean isShutdownRequested() {
+      return false;
     }
 
     @Override
-    public boolean isShutdownRequested() {
-      return false;
+    protected boolean isAllowed(TCredentials credentials, TableId tid) {
+      return allowedTables.containsKey(tid);
+    }
+
+    public void addAllowedTable(TableId tid) {
+      allowedTables.put(tid, tid);
     }
 
   }
@@ -147,6 +158,8 @@ public class ScanServerTest {
     TabletResolver resolver = createMock(TabletResolver.class);
 
     TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock();
+    TableId tid = TableId.of("42");
+    expect(sextent.tableId()).andReturn(tid).once();
     expect(reservation.newTablet(ss, sextent)).andReturn(tablet);
     expect(reservation.getFailures()).andReturn(Map.of()).anyTimes();
     reservation.close();
@@ -157,14 +170,15 @@ public class ScanServerTest {
     expect(handler.continueScan(tinfo, 15, 0L)).andReturn(new ScanResult());
     handler.closeScan(tinfo, 15);
 
-    replay(reservation, handler);
+    replay(reservation, sextent, handler);
 
+    ss.allowedTables = new ConcurrentHashMap<>();
+    ss.addAllowedTable(tid);
     ss.delegate = handler;
     ss.extent = sextent;
     ss.resolver = resolver;
     ss.reservation = reservation;
     ss.clientAddress = HostAndPort.fromParts("127.0.0.1", 1234);
-    ss.systemUser = false;
 
     TKeyExtent textent = createMock(TKeyExtent.class);
     InitialScan is = ss.startScan(tinfo, tcreds, textent, trange, tcols, 10, 
titer, ssio, auths,
@@ -172,7 +186,7 @@ public class ScanServerTest {
     assertEquals(15, is.getScanID());
     ss.continueScan(tinfo, is.getScanID(), 0L);
     ss.closeScan(tinfo, is.getScanID());
-    verify(reservation, handler);
+    verify(reservation, sextent, handler);
   }
 
   @Test
@@ -194,18 +208,20 @@ public class ScanServerTest {
     Map<String,String> execHints = new HashMap<>();
     ScanReservation reservation = createMock(ScanReservation.class);
 
-    expect(extent.isMeta()).andReturn(false).anyTimes();
+    TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock();
+    TableId tid = TableId.of("42");
+    expect(extent.tableId()).andReturn(tid).once();
     expect(extent.toThrift()).andReturn(textent).anyTimes();
     expect(reservation.getFailures()).andReturn(Map.of(textent, ranges));
     reservation.close();
 
     replay(extent, reservation);
 
-    TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock();
+    ss.allowedTables = new ConcurrentHashMap<>();
+    ss.addAllowedTable(tid);
     ss.extent = extent;
     ss.delegate = handler;
     ss.reservation = reservation;
-    ss.systemUser = false;
 
     assertThrows(NotServingTabletException.class, () -> {
       ss.startScan(tinfo, tcreds, textent, trange, tcols, 10, titer, ssio, 
auths, false, false, 10,
@@ -246,7 +262,8 @@ public class ScanServerTest {
     };
 
     TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock();
-    expect(extent.isMeta()).andReturn(false).anyTimes();
+    TableId tid = TableId.of("42");
+    expect(extent.tableId()).andReturn(tid).once();
     expect(reservation.newTablet(ss, extent)).andReturn(tablet);
     expect(reservation.getTabletMetadataExtents()).andReturn(Set.of(extent));
     expect(reservation.getFailures()).andReturn(Map.of());
@@ -259,12 +276,13 @@ public class ScanServerTest {
 
     replay(extent, reservation, handler);
 
+    ss.allowedTables = new ConcurrentHashMap<>();
+    ss.addAllowedTable(tid);
     ss.delegate = handler;
     ss.extent = extent;
     ss.resolver = resolver;
     ss.reservation = reservation;
     ss.clientAddress = HostAndPort.fromParts("127.0.0.1", 1234);
-    ss.systemUser = false;
 
     Map<TKeyExtent,List<TRange>> extents = new HashMap<>();
     extents.put(createMock(TKeyExtent.class), ranges);
@@ -309,7 +327,8 @@ public class ScanServerTest {
     };
 
     TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock();
-    expect(extent.isMeta()).andReturn(false).anyTimes();
+    TableId tid = TableId.of("42");
+    expect(extent.tableId()).andReturn(tid).once();
     expect(reservation.newTablet(ss, extent)).andReturn(tablet).anyTimes();
     expect(reservation.getTabletMetadataExtents()).andReturn(Set.of());
     expect(reservation.getFailures()).andReturn(Map.of(textent, 
ranges)).anyTimes();
@@ -321,12 +340,13 @@ public class ScanServerTest {
 
     replay(extent, reservation, handler);
 
+    ss.allowedTables = new ConcurrentHashMap<>();
+    ss.addAllowedTable(tid);
     ss.delegate = handler;
     ss.extent = extent;
     ss.resolver = resolver;
     ss.reservation = reservation;
     ss.clientAddress = HostAndPort.fromParts("127.0.0.1", 1234);
-    ss.systemUser = false;
 
     Map<TKeyExtent,List<TRange>> extents = new HashMap<>();
     extents.put(textent, ranges);
@@ -370,7 +390,6 @@ public class ScanServerTest {
     ss.delegate = handler;
     ss.resolver = resolver;
     ss.clientAddress = HostAndPort.fromParts("127.0.0.1", 1234);
-    ss.systemUser = false;
 
     assertThrows(TException.class, () -> {
       ss.startMultiScan(tinfo, tcreds, extents, tcols, titer, ssio, auths, 
false, tsc, 30L,
@@ -380,7 +399,7 @@ public class ScanServerTest {
   }
 
   @Test
-  public void testScanMetaTablesSystemUser() throws Exception {
+  public void testScanDefaultAllowedTables() throws Exception {
     handler = createMock(ThriftScanClientHandler.class);
 
     TInfo tinfo = createMock(TInfo.class);
@@ -399,7 +418,7 @@ public class ScanServerTest {
     TabletResolver resolver = createMock(TabletResolver.class);
 
     TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock();
-    expect(sextent.isMeta()).andReturn(true).anyTimes();
+    expect(sextent.tableId()).andReturn(MetadataTable.ID).once();
     expect(reservation.newTablet(ss, sextent)).andReturn(tablet);
     expect(reservation.getFailures()).andReturn(Map.of()).anyTimes();
     reservation.close();
@@ -412,12 +431,13 @@ public class ScanServerTest {
 
     replay(sextent, reservation, handler);
 
+    ss.allowedTables = new ConcurrentHashMap<>();
+    ss.addAllowedTable(MetadataTable.ID);
     ss.delegate = handler;
     ss.extent = sextent;
     ss.resolver = resolver;
     ss.reservation = reservation;
     ss.clientAddress = HostAndPort.fromParts("127.0.0.1", 1234);
-    ss.systemUser = true;
 
     TKeyExtent textent = createMock(TKeyExtent.class);
     InitialScan is = ss.startScan(tinfo, tcreds, textent, trange, tcols, 10, 
titer, ssio, auths,
@@ -430,7 +450,7 @@ public class ScanServerTest {
   }
 
   @Test
-  public void testScanMetaTablesNonSystemUser() throws Exception {
+  public void testScanDisallowedTable() throws Exception {
     handler = createMock(ThriftScanClientHandler.class);
 
     TInfo tinfo = createMock(TInfo.class);
@@ -448,25 +468,40 @@ public class ScanServerTest {
     TabletResolver resolver = createMock(TabletResolver.class);
 
     TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock();
-    expect(sextent.isMeta()).andReturn(true).anyTimes();
+    expect(sextent.tableId()).andReturn(MetadataTable.ID).anyTimes();
     expect(reservation.getFailures()).andReturn(Map.of()).anyTimes();
 
     replay(sextent, reservation, handler);
 
+    ss.allowedTables = new ConcurrentHashMap<>();
+    ss.addAllowedTable(TableId.of("42"));
     ss.delegate = handler;
     ss.extent = sextent;
     ss.resolver = resolver;
     ss.reservation = reservation;
     ss.clientAddress = HostAndPort.fromParts("127.0.0.1", 1234);
-    ss.systemUser = false;
 
     TKeyExtent textent = createMock(TKeyExtent.class);
-    assertThrows(TException.class, () -> {
+    TException te = assertThrows(TException.class, () -> {
       ss.startScan(tinfo, tcreds, textent, trange, tcols, 10, titer, ssio, 
auths, false, false, 10,
           tsc, 30L, classLoaderContext, execHints, 0L);
     });
+    assertTrue(te instanceof TApplicationException);
+    TApplicationException tae = (TApplicationException) te;
+    assertEquals(TApplicationException.INTERNAL_ERROR, tae.getType());
+    assertTrue(tae.getMessage().contains("disallowed by property"));
     verify(sextent, reservation, handler);
 
   }
 
+  @Test
+  public void testTableNameRegex() {
+    String r = "^(?!accumulo\\.).*$";
+    Pattern p = Pattern.compile(r);
+
+    assertFalse(p.matcher("accumulo.root").matches());
+    assertFalse(p.matcher("accumulo.metadata").matches());
+    assertTrue(p.matcher("test.table").matches());
+  }
+
 }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/ScanServerAllowedTablesIT.java 
b/test/src/main/java/org/apache/accumulo/test/ScanServerAllowedTablesIT.java
new file mode 100644
index 0000000000..e14834cf48
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/ScanServerAllowedTablesIT.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.conf.ClientProperty;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.spi.scan.ScanServerSelector;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.util.Wait;
+import org.apache.accumulo.tserver.ScanServer;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.thrift.TApplicationException;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import com.google.common.collect.Iterables;
+
+public class ScanServerAllowedTablesIT extends SharedMiniClusterBase {
+
+  // @formatter:off
+  private static final String clientConfiguration =
+     "["+
+     " {"+
+     "   \"isDefault\": true,"+
+     "   \"maxBusyTimeout\": \"5m\","+
+     "   \"busyTimeoutMultiplier\": 8,"+
+     "   \"scanTypeActivations\": [],"+
+     "   \"attemptPlans\": ["+
+     "     {"+
+     "       \"servers\": \"3\","+
+     "       \"busyTimeout\": \"33ms\","+
+     "       \"salt\": \"one\""+
+     "     },"+
+     "     {"+
+     "       \"servers\": \"13\","+
+     "       \"busyTimeout\": \"33ms\","+
+     "       \"salt\": \"two\""+
+     "     },"+
+     "     {"+
+     "       \"servers\": \"100%\","+
+     "       \"busyTimeout\": \"33ms\""+
+     "     }"+
+     "   ]"+
+     "  },"+
+     " {"+
+     "   \"isDefault\": false,"+
+     "   \"maxBusyTimeout\": \"5m\","+
+     "   \"busyTimeoutMultiplier\": 8,"+
+     "   \"group\": \"GROUP1\","+
+     "   \"scanTypeActivations\": [\"use_group1\"],"+
+     "   \"attemptPlans\": ["+
+     "     {"+
+     "       \"servers\": \"3\","+
+     "       \"busyTimeout\": \"33ms\","+
+     "       \"salt\": \"one\""+
+     "     },"+
+     "     {"+
+     "       \"servers\": \"13\","+
+     "       \"busyTimeout\": \"33ms\","+
+     "       \"salt\": \"two\""+
+     "     },"+
+     "     {"+
+     "       \"servers\": \"100%\","+
+     "       \"busyTimeout\": \"33ms\""+
+     "     }"+
+     "   ]"+
+     "  }"+
+     "]";
+  // @formatter:on
+
+  public static class SSATITConfiguration implements 
MiniClusterConfigurationCallback {
+
+    @Override
+    public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
coreSite) {
+
+      cfg.setNumScanServers(1);
+
+      // allow the ScanServer in the DEFAULT group to only scan tables in 
accumulo namespace
+      cfg.setProperty(Property.SSERV_SCAN_ALLOWED_TABLES.getKey()
+          + ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME, 
"^accumulo\\..*$");
+      // allow the ScanServer in the GROUP1 group to only scan tables created 
with the prefix 'test'
+      cfg.setProperty(Property.SSERV_SCAN_ALLOWED_TABLES.getKey() + "GROUP1", 
"^test.*");
+
+      
cfg.setClientProperty(ClientProperty.SCAN_SERVER_SELECTOR_OPTS_PREFIX.getKey() 
+ "profiles",
+          clientConfiguration);
+    }
+
+  }
+
+  @BeforeAll
+  public static void start() throws Exception {
+    SharedMiniClusterBase.startMiniClusterWithConfig(new 
SSATITConfiguration());
+    
SharedMiniClusterBase.getCluster().getClusterControl().start(ServerType.SCAN_SERVER,
+        "localhost");
+
+    String zooRoot = getCluster().getServerContext().getZooKeeperRoot();
+    ZooReaderWriter zrw = getCluster().getServerContext().getZooReaderWriter();
+    String scanServerRoot = zooRoot + Constants.ZSSERVERS;
+
+    while (zrw.getChildren(scanServerRoot).size() == 0) {
+      Thread.sleep(500);
+    }
+  }
+
+  @AfterAll
+  public static void stop() throws Exception {
+    SharedMiniClusterBase.stopMiniCluster();
+  }
+
+  public static enum ScannerType {
+    BATCH_SCANNER, SCANNER;
+  }
+
+  private ScannerBase createScanner(AccumuloClient client, ScannerType stype, 
String tableName)
+      throws TableNotFoundException {
+    switch (stype) {
+      case BATCH_SCANNER:
+        BatchScanner batchScanner = client.createBatchScanner(tableName, 
Authorizations.EMPTY);
+        batchScanner.setRanges(Set.of(new Range()));
+        return batchScanner;
+      case SCANNER:
+        Scanner scanner = client.createScanner(tableName, 
Authorizations.EMPTY);
+        scanner.setRange(new Range());
+        return scanner;
+      default:
+        throw new IllegalArgumentException("Unknown scanner type: " + stype);
+    }
+  }
+
+  @SuppressWarnings("unused")
+  @ParameterizedTest
+  @EnumSource(value = ScannerType.class)
+  public void testAllowedTables(ScannerType stype) throws Exception {
+
+    final String zooRoot = getCluster().getServerContext().getZooKeeperRoot();
+    final ZooKeeper zk = 
getCluster().getServerContext().getZooReaderWriter().getZooKeeper();
+    final String scanServerRoot = zooRoot + Constants.ZSSERVERS;
+
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+
+      // Start the 2nd ScanServer
+      // Bump the number of scan serves that can run to start the GROUP1 scan 
server
+      getCluster().getConfig().setNumScanServers(2);
+      getCluster()._exec(ScanServer.class, ServerType.SCAN_SERVER, Map.of(),
+          new String[] {"-g", "GROUP1"});
+      Wait.waitFor(() -> zk.getChildren(scanServerRoot, false).size() == 2);
+      Wait.waitFor(() -> ((ClientContext) 
client).getScanServers().values().stream().anyMatch(
+          (p) -> 
p.getSecond().equals(ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME)) == 
true);
+      Wait.waitFor(() -> ((ClientContext) 
client).getScanServers().values().stream()
+          .anyMatch((p) -> p.getSecond().equals("GROUP1")) == true);
+
+      // Create table with test prefix, load some data
+      final String testTableName = "testAllowedTables" + stype.name();
+      final int ingestedEntryCount =
+          ScanServerIT.createTableAndIngest(client, testTableName, null, 10, 
10, "colf");
+      assertEquals(100, ingestedEntryCount);
+
+      // Using default ScanServer should succeed, only allowed to scan system 
tables
+      try (ScannerBase scanner = createScanner(client, stype, 
MetadataTable.NAME)) {
+        scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+        assertTrue(Iterables.size(scanner) > 0);
+      }
+
+      // Using default ScanServer should fail, only allowed to scan system 
tables
+      try (ScannerBase scanner = createScanner(client, stype, testTableName)) {
+        scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+        RuntimeException re = assertThrows(RuntimeException.class, () -> 
Iterables.size(scanner));
+        Throwable root = ExceptionUtils.getRootCause(re);
+        assertTrue(root instanceof TApplicationException);
+        TApplicationException tae = (TApplicationException) root;
+        assertEquals(TApplicationException.INTERNAL_ERROR, tae.getType());
+        assertTrue(tae.getMessage().contains("disallowed by property"));
+      }
+
+      // Using GROUP1 ScanServer should fail, only allowed to test tables
+      try (ScannerBase scanner = createScanner(client, stype, 
MetadataTable.NAME)) {
+        scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+        scanner.setExecutionHints(Map.of("scan_type", "use_group1"));
+        RuntimeException re = assertThrows(RuntimeException.class, () -> 
Iterables.size(scanner));
+        Throwable root = ExceptionUtils.getRootCause(re);
+        assertTrue(root instanceof TApplicationException);
+        TApplicationException tae = (TApplicationException) root;
+        assertEquals(TApplicationException.INTERNAL_ERROR, tae.getType());
+        assertTrue(tae.getMessage().contains("disallowed by property"));
+      }
+
+      // Using GROUP1 ScanServer should succeed
+      try (ScannerBase scanner = createScanner(client, stype, testTableName)) {
+        scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+        scanner.setExecutionHints(Map.of("scan_type", "use_group1"));
+        assertEquals(100, Iterables.size(scanner));
+      }
+
+      // Change the GROUP1 property so that subsequent test tables don't work
+      getCluster().getServerContext().instanceOperations()
+          .setProperty(Property.SSERV_SCAN_ALLOWED_TABLES.getKey() + "GROUP1", 
"^foo.*");
+
+      // Using GROUP1 ScanServer should fail, only allowed to test 'test*' 
tables
+      try (ScannerBase scanner = createScanner(client, stype, 
MetadataTable.NAME)) {
+        scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+        scanner.setExecutionHints(Map.of("scan_type", "use_group1"));
+        RuntimeException re = assertThrows(RuntimeException.class, () -> 
Iterables.size(scanner));
+        Throwable root = ExceptionUtils.getRootCause(re);
+        assertTrue(root instanceof TApplicationException);
+        TApplicationException tae = (TApplicationException) root;
+        assertEquals(TApplicationException.INTERNAL_ERROR, tae.getType());
+        assertTrue(tae.getMessage().contains("disallowed by property"));
+      }
+
+      // Using GROUP1 ScanServer should fail as the property was changed
+      try (ScannerBase scanner = createScanner(client, stype, testTableName)) {
+        scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+        scanner.setExecutionHints(Map.of("scan_type", "use_group1"));
+        // Try multiple times waiting for the server to pick up the property 
change
+        Wait.waitFor(() -> {
+          try {
+            var unused = Iterables.size(scanner);
+            return false;
+          } catch (RuntimeException e) {
+            return true;
+          }
+        });
+      }
+
+      // Change the GROUP1 property so that subsequent test tables do work
+      getCluster().getServerContext().instanceOperations()
+          .setProperty(Property.SSERV_SCAN_ALLOWED_TABLES.getKey() + "GROUP1", 
"^test.*");
+
+      // Using GROUP1 ScanServer should succeed as the property was changed 
back
+      try (ScannerBase scanner = createScanner(client, stype, testTableName)) {
+        scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+        scanner.setExecutionHints(Map.of("scan_type", "use_group1"));
+        // Try multiple times waiting for the server to pick up the property 
change
+        Wait.waitFor(() -> {
+          try {
+            int size = Iterables.size(scanner);
+            return size == 100;
+          } catch (RuntimeException e) {
+            return false;
+          }
+        });
+
+      }
+
+    }
+
+  }
+
+}


Reply via email to