This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 5266aa5aca24f168b36c06b5f644e5bf2038b010 Merge: 28d152348d b38a2dbf65 Author: Dave Marion <[email protected]> AuthorDate: Wed Feb 25 16:17:58 2026 +0000 Merge branch '2.1' .../org/apache/accumulo/core/conf/Property.java | 16 ++ .../org/apache/accumulo/tserver/ScanServer.java | 152 +++++++++++-- .../accumulo/tserver/ThriftScanClientHandler.java | 2 +- .../apache/accumulo/tserver/ScanServerTest.java | 72 +++++-- .../accumulo/shell/commands/DeleteManyCommand.java | 9 + .../accumulo/shell/commands/GrepCommand.java | 9 + .../accumulo/shell/commands/ScanCommand.java | 2 +- .../accumulo/test/ScanServerAllowedTablesIT.java | 236 +++++++++++++++++++++ .../test/ScanServerGroupConfigurationIT.java | 4 +- 9 files changed, 466 insertions(+), 36 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java index dd311d2eda,90602ad616..dd3aa1920a --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@@ -624,10 -560,16 +624,26 @@@ public enum Property PropertyType.TIMEDURATION, "The amount of time a scan reference is unused before its deleted from metadata table.", "2.1.0"), - @Experimental - SSERV_SCAN_ALLOWED_TABLES("sserver.scan.allowed.tables.group.", null, PropertyType.PREFIX, ++ SSERV_SCAN_ALLOWED_TABLES("sserver.scan.allowed.tables", "^(?!accumulo\\.).*$", ++ PropertyType.STRING, ++ "A regular expression that determines which tables are allowed to be scanned for" ++ + " servers in the specified group. The property name should end with the scan server" ++ + " group and the property value should take into account the table namespace and name." ++ + " The default value disallows scans on tables in the accumulo namespace.", ++ "4.0.0"), ++ @Deprecated(since = "4.0.0") ++ @ReplacedBy(property = SSERV_SCAN_ALLOWED_TABLES) ++ SSERV_SCAN_ALLOWED_TABLES_DEPRECATED("sserver.scan.allowed.tables.group.", null, ++ PropertyType.PREFIX, + "A regular expression that determines which tables are allowed to be scanned for" + + " servers in the specified group. The property name should end with the scan server" + + " group and the property value should take into account the table namespace and name." + + " The default value disallows scans on tables in the accumulo namespace.", + "2.1.5"), - @Experimental SSERV_THREADCHECK("sserver.server.threadcheck.time", "1s", PropertyType.TIMEDURATION, "The time between adjustments of the thrift server thread pool.", "2.1.0"), + SSERV_WAL_SORT_MAX_CONCURRENT("sserver.wal.sort.concurrent.max", "2", PropertyType.COUNT, + "The maximum number of threads to use to sort logs during recovery.", "4.0.0"), // properties that are specific to tablet server behavior TSERV_PREFIX("tserver.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the tablet servers.", "1.3.5"), diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index fd9f3fa37e,53b121f08d..14e2b77d1b --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@@ -56,7 -57,8 +60,8 @@@ import org.apache.accumulo.core.clientI import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.conf.cluster.ClusterConfigParser; +import org.apache.accumulo.core.conf.SiteConfiguration; + import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.InitialMultiScan; import org.apache.accumulo.core.dataImpl.thrift.InitialScan; @@@ -82,27 -79,30 +87,28 @@@ import org.apache.accumulo.core.metadat import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.metrics.MetricsInfo; -import org.apache.accumulo.core.process.thrift.ServerProcessService; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; -import org.apache.accumulo.core.spi.scan.ScanServerSelector; -import org.apache.accumulo.core.tabletserver.thrift.ActiveScan; +import org.apache.accumulo.core.tabletscan.thrift.ActiveScan; +import org.apache.accumulo.core.tabletscan.thrift.ScanServerBusyException; +import org.apache.accumulo.core.tabletscan.thrift.TSampleNotPresentException; +import org.apache.accumulo.core.tabletscan.thrift.TSamplerConfiguration; +import org.apache.accumulo.core.tabletscan.thrift.TabletScanClientService; +import org.apache.accumulo.core.tabletscan.thrift.TooManyFilesException; import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; -import org.apache.accumulo.core.tabletserver.thrift.ScanServerBusyException; -import org.apache.accumulo.core.tabletserver.thrift.TSampleNotPresentException; -import org.apache.accumulo.core.tabletserver.thrift.TSamplerConfiguration; -import org.apache.accumulo.core.tabletserver.thrift.TabletScanClientService; -import org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException; -import org.apache.accumulo.core.trace.thrift.TInfo; -import org.apache.accumulo.core.util.HostAndPort; + import org.apache.accumulo.core.util.Retry; +import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.util.cache.Caches.CacheName; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.server.AbstractServer; -import org.apache.accumulo.server.GarbageCollectionLogger; import org.apache.accumulo.server.ServerContext; -import org.apache.accumulo.server.ServerOpts; +import org.apache.accumulo.server.client.ClientServiceHandler; +import org.apache.accumulo.server.compaction.PausedCompactionMetrics; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.rpc.ServerAddress; import org.apache.accumulo.server.rpc.TServerUtils; import org.apache.accumulo.server.rpc.ThriftProcessorTypes; import org.apache.accumulo.server.security.SecurityUtil; @@@ -118,8 -117,10 +124,9 @@@ import org.apache.accumulo.tserver.sess import org.apache.accumulo.tserver.tablet.SnapshotTablet; import org.apache.accumulo.tserver.tablet.Tablet; import org.apache.accumulo.tserver.tablet.TabletBase; + import org.apache.thrift.TApplicationException; import org.apache.thrift.TException; import org.apache.thrift.TProcessor; -import org.apache.zookeeper.KeeperException; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -202,12 -218,19 +209,14 @@@ public class ScanServer extends Abstrac private ScanServerMetrics scanServerMetrics; private BlockCacheMetrics blockCacheMetrics; - private final ZooCache managerLockCache; - - private final String groupName; - + private final ConcurrentHashMap<TableId,Boolean> allowedTables = new ConcurrentHashMap<>(); + private volatile String currentAllowedTableRegex; + - public ScanServer(ScanServerOpts opts, String[] args) { - super("sserver", opts, args); - + public ScanServer(ServerOpts opts, String[] args) { + super(ServerId.Type.SCAN_SERVER, opts, ServerContext::new, args); - context = super.getContext(); - log.info("Version " + Constants.VERSION); - log.info("Instance " + getContext().getInstanceID()); + LOG.info("Version " + Constants.VERSION); + LOG.info("Instance " + getContext().getInstanceID()); this.sessionManager = new SessionManager(context); this.resourceManager = new TabletServerResourceManager(context, this); @@@ -361,9 -400,13 +370,10 @@@ } SecurityUtil.serverLogin(getConfiguration()); + updateAllowedTables(false); - ServerAddress address = null; try { - address = startScanServerClientService(); - updateAdvertiseAddress(address.getAddress()); - clientAddress = getAdvertiseAddress(); + startScanServerClientService(); } catch (UnknownHostException e1) { throw new RuntimeException("Failed to start the scan server client service", e1); } @@@ -377,78 -420,177 +387,199 @@@ resourceManager.getDataCache(), resourceManager.getSummaryCache()); metricsInfo.addMetricsProducers(this, scanMetrics, scanServerMetrics, blockCacheMetrics); - metricsInfo.init(MetricsInfo.serviceTags(getContext().getInstanceName(), getApplicationName(), - clientAddress, groupName)); - // We need to set the compaction manager so that we don't get an NPE in CompactableImpl.close ServiceLock lock = announceExistence(); + this.getContext().setServiceLock(lock); - try { - while (!isShutdownRequested()) { - if (Thread.currentThread().isInterrupted()) { - LOG.info("Server process thread has been interrupted, shutting down"); - break; - } - try { - Thread.sleep(1000); - updateIdleStatus(sessionManager.getActiveScans().isEmpty() - && tabletMetadataCache.estimatedSize() == 0); - updateAllowedTables(false); - } catch (InterruptedException e) { - LOG.info("Interrupt Exception received, shutting down"); - gracefulShutdown(getContext().rpcCreds()); - } - } - } finally { - // Wait for scans to got to zero - while (!sessionManager.getActiveScans().isEmpty()) { - LOG.debug("Waiting on {} active scans to complete.", - sessionManager.getActiveScans().size()); - UtilWaitThread.sleep(1000); + int threadPoolSize = getConfiguration().getCount(Property.SSERV_WAL_SORT_MAX_CONCURRENT); + if (threadPoolSize > 0) { + final LogSorter logSorter = new LogSorter(this); + metricsInfo.addMetricsProducers(logSorter); + try { + // Attempt to process all existing log sorting work and start a background + // thread to look for log sorting work in the future + logSorter.startWatchingForRecoveryLogs(threadPoolSize); + } catch (Exception ex) { + LOG.error("Error starting LogSorter"); + throw new RuntimeException(ex); } + } else { + LOG.info( + "Log sorting for tablet recovery is disabled, SSERV_WAL_SORT_MAX_CONCURRENT is less than 1."); + } - LOG.debug("Stopping Thrift Servers"); - address.server.stop(); + metricsInfo.init(MetricsInfo.serviceTags(getContext().getInstanceName(), getApplicationName(), + getAdvertiseAddress(), getResourceGroup())); - try { - LOG.debug("Removing server scan references"); - this.getContext().getAmple().deleteScanServerFileReferences(clientAddress.toString(), - serverLockUUID); - } catch (Exception e) { - LOG.warn("Failed to remove scan server refs from metadata location", e); + while (!isShutdownRequested()) { + if (Thread.currentThread().isInterrupted()) { + LOG.info("Server process thread has been interrupted, shutting down"); + break; } - try { - LOG.debug("Closing filesystems"); - VolumeManager mgr = getContext().getVolumeManager(); - if (null != mgr) { - mgr.close(); - } - } catch (IOException e) { - LOG.warn("Failed to close filesystem : {}", e.getMessage(), e); + Thread.sleep(1000); + updateIdleStatus( + sessionManager.getActiveScans().isEmpty() && tabletMetadataCache.estimatedSize() == 0); ++ updateAllowedTables(false); + } catch (InterruptedException e) { + LOG.info("Interrupt Exception received, shutting down"); + gracefulShutdown(getContext().rpcCreds()); } + } - if (tmCacheExecutor != null) { - LOG.debug("Shutting down TabletMetadataCache executor"); - tmCacheExecutor.shutdownNow(); - } + // Wait for scans to get to zero + while (!sessionManager.getActiveScans().isEmpty()) { + LOG.debug("Waiting on {} active scans to complete.", sessionManager.getActiveScans().size()); + UtilWaitThread.sleep(1000); + } - gcLogger.logGCInfo(getConfiguration()); - super.close(); - getShutdownComplete().set(true); - LOG.info("stop requested. exiting ... "); - try { - if (null != lock) { - lock.unlock(); - } - } catch (Exception e) { - LOG.warn("Failed to release scan server lock", e); + LOG.debug("Stopping Thrift Servers"); + getThriftServer().stop(); + + try { + LOG.info("Removing server scan references"); + this.getContext().getAmple().scanServerRefs().delete(getAdvertiseAddress().toString(), + serverLockUUID); + } catch (Exception e) { + LOG.warn("Failed to remove scan server refs from metadata location", e); + } + + try { + LOG.debug("Closing filesystems"); + VolumeManager mgr = getContext().getVolumeManager(); + if (null != mgr) { + mgr.close(); } + } catch (IOException e) { + LOG.warn("Failed to close filesystem : {}", e.getMessage(), e); + } + if (tmCacheExecutor != null) { + LOG.debug("Shutting down TabletMetadataCache executor"); + tmCacheExecutor.shutdownNow(); } } + // Visible for testing + protected boolean isAllowed(TCredentials credentials, TableId tid) + throws ThriftSecurityException { + Boolean result = allowedTables.get(tid); + if (result == null) { + - final Retry retry = - Retry.builder().maxRetries(10).retryAfter(1, SECONDS).incrementBy(0, SECONDS) - .maxWait(2, SECONDS).backOffFactor(1.0).logInterval(3, SECONDS).createRetry(); ++ final Retry retry = Retry.builder().maxRetries(10).retryAfter(Duration.ofSeconds(1)) ++ .incrementBy(Duration.ZERO).maxWait(Duration.ofSeconds(2)).backOffFactor(1.0) ++ .logInterval(Duration.ofSeconds(3)).createRetry(); + + while (result == null && retry.canRetry()) { + try { + retry.waitForNextAttempt(LOG, + "Allowed tables mapping does not contain an entry for table: " + tid + + ", refreshing table..."); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Interrupted while waiting for next retry", e); + break; + } + // Clear the cache and try again, maybe there + // is a race condition in table creation and scan + updateAllowedTables(true); + // validate that the table exists, else throw + delegate.getNamespaceId(credentials, tid); + result = allowedTables.get(tid); + retry.useRetry(); + } + + if (result == null) { + // Ran out of retries + throw new IllegalStateException( + "Unable to get allowed table mapping for table: " + tid + " within 10s"); + } + } + return result; + } + + private synchronized void updateAllowedTables(boolean clearCache) { + + LOG.trace("Updating allowed tables for ScanServer"); + if (clearCache) { + context.clearTableListCache(); + } + + // Remove tables that no longer exist + allowedTables.keySet().forEach(tid -> { - if (!getContext().getTableIdToNameMap().containsKey(tid)) { ++ if (!getContext().tableNodeExists(tid)) { + LOG.trace("Removing table {} from allowed table map as it no longer exists", tid); + allowedTables.remove(tid); + } + }); + - final String propName = Property.SSERV_SCAN_ALLOWED_TABLES.getKey() + groupName; - String allowedTableRegex = getConfiguration().get(propName); ++ @SuppressWarnings("deprecation") ++ final String oldPropName = Property.SSERV_SCAN_ALLOWED_TABLES_DEPRECATED.getKey() ++ + this.getResourceGroup().canonical(); ++ final String oldPropVal = getConfiguration().get(oldPropName); ++ ++ final String propName = Property.SSERV_SCAN_ALLOWED_TABLES.getKey(); ++ final String propVal = getConfiguration().get(propName); ++ ++ String allowedTableRegex = null; ++ if (propVal != null && oldPropVal != null) { ++ LOG.warn( ++ "Property {} is deprecated, using value from replacement property {}. Remove old property from config.", ++ oldPropName, propName); ++ allowedTableRegex = propVal; ++ } else if (propVal == null && oldPropVal != null) { ++ LOG.warn("Property {} is deprecated, please use the newer replacement property {}", ++ oldPropName, propName); ++ allowedTableRegex = oldPropVal; ++ } else if (propVal != null && oldPropVal == null) { ++ allowedTableRegex = propVal; ++ } ++ + if (allowedTableRegex == null) { - allowedTableRegex = DEFAULT_SCAN_ALLOWED_PATTERN; ++ allowedTableRegex = Property.SSERV_SCAN_ALLOWED_TABLES.getDefaultValue(); + } + + if (currentAllowedTableRegex == null) { + LOG.trace("Property {} initial value: {}", propName, allowedTableRegex); + } else if (currentAllowedTableRegex.equals(allowedTableRegex)) { + // Property value has not changed, do nothing + } else { + LOG.info("Property {} has changed. Old value: {}, new value: {}", propName, + currentAllowedTableRegex, allowedTableRegex); + } + + Pattern allowedTablePattern; + try { + allowedTablePattern = Pattern.compile(allowedTableRegex); + // Regex is valid, store it + currentAllowedTableRegex = allowedTableRegex; + } catch (PatternSyntaxException e) { + LOG.error( + "Property {} contains an invalid regular expression. Property value: {}. Disabling all tables.", + propName, allowedTableRegex); + allowedTablePattern = null; + } + + Pattern p = allowedTablePattern; - context.getTableNameToIdMap().entrySet().forEach(e -> { - String tname = e.getKey(); - TableId tid = e.getValue(); ++ context.createTableIdToQualifiedNameMap().entrySet().forEach(te -> { ++ String tname = te.getValue(); ++ TableId tid = te.getKey(); ++ LOG.info("Table Mapping: {} -> {}", tid, tname); + if (p == null) { + allowedTables.put(tid, Boolean.FALSE); + } else { + Matcher m = p.matcher(tname); + if (m.matches()) { + LOG.trace("Table {} can now be scanned via this ScanServer", tname); + allowedTables.put(tid, Boolean.TRUE); + } else { + LOG.trace("Table {} cannot be scanned via this ScanServer", tname); + allowedTables.put(tid, Boolean.FALSE); + } + } + }); - + } + @SuppressWarnings("unchecked") private Map<KeyExtent,TabletMetadata> getTabletMetadata(Collection<KeyExtent> extents) { if (tabletMetadataCache == null) { @@@ -937,9 -1075,10 +1063,10 @@@ KeyExtent extent = getKeyExtent(textent); - if (extent.isSystemTable() && !isSystemUser(credentials)) { - throw new TException( - "Only the system user can perform eventual consistency scans on the root and metadata tables"); + if (!isAllowed(credentials, extent.tableId())) { + throw new TApplicationException(TApplicationException.INTERNAL_ERROR, + "Scan of table " + extent.tableId() + " disallowed by property: " - + Property.SSERV_SCAN_ALLOWED_TABLES.getKey() + this.groupName); ++ + Property.SSERV_SCAN_ALLOWED_TABLES.getKey()); } try (ScanReservation reservation = @@@ -1010,9 -1148,10 +1137,10 @@@ for (Entry<TKeyExtent,List<TRange>> entry : tbatch.entrySet()) { KeyExtent extent = getKeyExtent(entry.getKey()); - if (extent.isSystemTable() && !isSystemUser(credentials)) { - throw new TException( - "Only the system user can perform eventual consistency scans on the root and metadata tables"); + if (!isAllowed(credentials, extent.tableId())) { + throw new TApplicationException(TApplicationException.INTERNAL_ERROR, + "Scan of table " + extent.tableId() + " disallowed by property: " - + Property.SSERV_SCAN_ALLOWED_TABLES.getKey() + this.groupName); ++ + Property.SSERV_SCAN_ALLOWED_TABLES.getKey()); } batch.put(extent, entry.getValue()); diff --cc server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java index 6f199d2c4d,968f75e1b2..c9a5a83032 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java @@@ -32,10 -34,10 +34,12 @@@ import java.util.HashMap import java.util.List; import java.util.Map; import java.util.Set; + import java.util.concurrent.ConcurrentHashMap; + import java.util.regex.Pattern; +import org.apache.accumulo.core.cli.ServerOpts; import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.clientImpl.thrift.TInfo; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.InitialMultiScan; @@@ -56,7 -61,7 +60,8 @@@ import org.apache.accumulo.tserver.sess import org.apache.accumulo.tserver.tablet.SnapshotTablet; import org.apache.accumulo.tserver.tablet.Tablet; import org.apache.accumulo.tserver.tablet.TabletBase; +import org.apache.hadoop.io.Text; + import org.apache.thrift.TApplicationException; import org.apache.thrift.TException; import org.junit.jupiter.api.Test; @@@ -67,9 -72,9 +72,9 @@@ public class ScanServerTest private KeyExtent extent; private TabletResolver resolver; private ScanReservation reservation; - private boolean systemUser; + private ConcurrentHashMap<TableId,TableId> allowedTables; - protected TestScanServer(ScanServerOpts opts, String[] args) { + protected TestScanServer(ServerOpts opts, String[] args) { super(opts, args); } @@@ -137,9 -142,9 +146,10 @@@ public void testScan() throws Exception { handler = createMock(ThriftScanClientHandler.class); ++ TableId tid = TableId.of("1"); TInfo tinfo = createMock(TInfo.class); TCredentials tcreds = createMock(TCredentials.class); - KeyExtent sextent = newExtent(TableId.of("1")); - KeyExtent sextent = createMock(KeyExtent.class); ++ KeyExtent sextent = newExtent(tid); ScanReservation reservation = createMock(ScanReservation.class); SnapshotTablet tablet = createMock(SnapshotTablet.class); TRange trange = createMock(TRange.class); @@@ -163,13 -170,15 +173,14 @@@ expect(handler.continueScan(tinfo, 15, 0L)).andReturn(new ScanResult()); handler.closeScan(tinfo, 15); - replay(reservation, sextent, handler); + replay(reservation, handler); + ss.allowedTables = new ConcurrentHashMap<>(); + ss.addAllowedTable(tid); ss.delegate = handler; ss.extent = sextent; ss.resolver = resolver; ss.reservation = reservation; - ss.systemUser = false; - ss.clientAddress = HostAndPort.fromParts("127.0.0.1", 1234); TKeyExtent textent = createMock(TKeyExtent.class); InitialScan is = ss.startScan(tinfo, tcreds, textent, trange, tcols, 10, titer, ssio, auths, @@@ -184,10 -193,10 +195,11 @@@ public void testScanTabletLoadFailure() throws Exception { handler = createMock(ThriftScanClientHandler.class); ++ TableId tid = TableId.of("1"); TInfo tinfo = createMock(TInfo.class); TCredentials tcreds = createMock(TCredentials.class); - KeyExtent extent = newExtent(TableId.of("1")); - KeyExtent extent = createMock(KeyExtent.class); - TKeyExtent textent = createMock(TKeyExtent.class); ++ KeyExtent extent = newExtent(tid); + TKeyExtent textent = extent.toThrift(); TRange trange = createMock(TRange.class); List<TRange> ranges = new ArrayList<>(); List<TColumn> tcols = new ArrayList<>(); @@@ -199,12 -208,17 +211,14 @@@ Map<String,String> execHints = new HashMap<>(); ScanReservation reservation = createMock(ScanReservation.class); + TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock(); - TableId tid = TableId.of("42"); - expect(extent.tableId()).andReturn(tid).once(); - expect(extent.toThrift()).andReturn(textent).anyTimes(); expect(reservation.getFailures()).andReturn(Map.of(textent, ranges)); reservation.close(); - replay(extent, reservation); + replay(reservation); - TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock(); + ss.allowedTables = new ConcurrentHashMap<>(); + ss.addAllowedTable(tid); ss.extent = extent; ss.delegate = handler; ss.reservation = reservation; @@@ -222,10 -235,10 +235,11 @@@ public void testBatchScan() throws Exception { handler = createMock(ThriftScanClientHandler.class); ++ TableId tid = TableId.of("1"); TInfo tinfo = createMock(TInfo.class); TCredentials tcreds = createMock(TCredentials.class); List<TRange> ranges = new ArrayList<>(); - KeyExtent extent = newExtent(TableId.of("1")); - KeyExtent extent = createMock(KeyExtent.class); ++ KeyExtent extent = newExtent(tid); ScanReservation reservation = createMock(ScanReservation.class); SnapshotTablet tablet = createMock(SnapshotTablet.class); Map<KeyExtent,List<TRange>> batch = new HashMap<>(); @@@ -259,13 -274,15 +273,14 @@@ expect(handler.continueMultiScan(tinfo, 15, 0L)).andReturn(new MultiScanResult()); handler.closeMultiScan(tinfo, 15); - replay(extent, reservation, handler); + replay(reservation, handler); + ss.allowedTables = new ConcurrentHashMap<>(); + ss.addAllowedTable(tid); ss.delegate = handler; ss.extent = extent; ss.resolver = resolver; ss.reservation = reservation; - ss.systemUser = false; - ss.clientAddress = HostAndPort.fromParts("127.0.0.1", 1234); Map<TKeyExtent,List<TRange>> extents = new HashMap<>(); extents.put(createMock(TKeyExtent.class), ranges); @@@ -282,11 -299,11 +297,12 @@@ public void testBatchScanTabletLoadFailure() throws Exception { handler = createMock(ThriftScanClientHandler.class); ++ TableId tid = TableId.of("1"); TInfo tinfo = createMock(TInfo.class); TCredentials tcreds = createMock(TCredentials.class); List<TRange> ranges = new ArrayList<>(); - KeyExtent extent = newExtent(TableId.of("1")); - KeyExtent extent = createMock(KeyExtent.class); - TKeyExtent textent = createMock(TKeyExtent.class); ++ KeyExtent extent = newExtent(tid); + TKeyExtent textent = extent.toThrift(); ScanReservation reservation = createMock(ScanReservation.class); SnapshotTablet tablet = createMock(SnapshotTablet.class); Map<KeyExtent,List<TRange>> batch = new HashMap<>(); @@@ -319,13 -338,15 +335,14 @@@ expect(handler.startMultiScan(tinfo, tcreds, tcols, titer, batch, ssio, auths, false, tsc, 30L, classLoaderContext, execHints, resolver, 0L)).andReturn(ims); - replay(extent, reservation, handler); + replay(reservation, handler); + ss.allowedTables = new ConcurrentHashMap<>(); + ss.addAllowedTable(tid); ss.delegate = handler; ss.extent = extent; ss.resolver = resolver; ss.reservation = reservation; - ss.systemUser = false; - ss.clientAddress = HostAndPort.fromParts("127.0.0.1", 1234); Map<TKeyExtent,List<TRange>> extents = new HashMap<>(); extents.put(textent, ranges); @@@ -368,7 -389,7 +385,6 @@@ TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock(); ss.delegate = handler; ss.resolver = resolver; - ss.systemUser = false; - ss.clientAddress = HostAndPort.fromParts("127.0.0.1", 1234); assertThrows(TException.class, () -> { ss.startMultiScan(tinfo, tcreds, extents, tcols, titer, ssio, auths, false, tsc, 30L, @@@ -407,13 -429,15 +423,14 @@@ expect(handler.continueScan(tinfo, 15, 0L)).andReturn(new ScanResult()); handler.closeScan(tinfo, 15); - replay(sextent, reservation, handler); + replay(reservation, handler); + ss.allowedTables = new ConcurrentHashMap<>(); - ss.addAllowedTable(MetadataTable.ID); ++ ss.addAllowedTable(SystemTables.METADATA.tableId()); ss.delegate = handler; ss.extent = sextent; ss.resolver = resolver; ss.reservation = reservation; - ss.systemUser = true; - ss.clientAddress = HostAndPort.fromParts("127.0.0.1", 1234); TKeyExtent textent = createMock(TKeyExtent.class); InitialScan is = ss.startScan(tinfo, tcreds, textent, trange, tcols, 10, titer, ssio, auths, @@@ -444,22 -468,29 +461,27 @@@ TabletResolver resolver = createMock(TabletResolver.class); TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock(); - expect(sextent.tableId()).andReturn(MetadataTable.ID).anyTimes(); expect(reservation.getFailures()).andReturn(Map.of()).anyTimes(); - replay(sextent, reservation, handler); + replay(reservation, handler); + ss.allowedTables = new ConcurrentHashMap<>(); + ss.addAllowedTable(TableId.of("42")); ss.delegate = handler; ss.extent = sextent; ss.resolver = resolver; ss.reservation = reservation; - ss.systemUser = false; - ss.clientAddress = HostAndPort.fromParts("127.0.0.1", 1234); TKeyExtent textent = createMock(TKeyExtent.class); - assertThrows(TException.class, () -> { + TException te = assertThrows(TException.class, () -> { ss.startScan(tinfo, tcreds, textent, trange, tcols, 10, titer, ssio, auths, false, false, 10, tsc, 30L, classLoaderContext, execHints, 0L); }); + assertTrue(te instanceof TApplicationException); + TApplicationException tae = (TApplicationException) te; + assertEquals(TApplicationException.INTERNAL_ERROR, tae.getType()); + assertTrue(tae.getMessage().contains("disallowed by property")); - verify(sextent, reservation, handler); + verify(reservation, handler); } diff --cc shell/src/main/java/org/apache/accumulo/shell/commands/DeleteManyCommand.java index f2c21057a6,e0f0cbe1f5..e217f045e2 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/DeleteManyCommand.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/DeleteManyCommand.java @@@ -41,6 -41,15 +41,11 @@@ public class DeleteManyCommand extends throws Exception { final String tableName = OptUtil.getTableOpt(cl, shellState); - @SuppressWarnings("deprecation") - final org.apache.accumulo.core.util.interpret.ScanInterpreter interpreter = - getInterpreter(cl, tableName, shellState); - + String classLoaderContext = null; + if (cl.hasOption(contextOpt.getOpt())) { + classLoaderContext = cl.getOptionValue(contextOpt.getOpt()); + } + // handle first argument, if present, the authorizations list to // scan with final Authorizations auths = getAuths(cl, shellState); diff --cc test/src/main/java/org/apache/accumulo/test/ScanServerAllowedTablesIT.java index 0000000000,e14834cf48..9c2e20d353 mode 000000,100644..100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerAllowedTablesIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerAllowedTablesIT.java @@@ -1,0 -1,292 +1,236 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.accumulo.test; + + import static org.junit.jupiter.api.Assertions.assertEquals; + import static org.junit.jupiter.api.Assertions.assertThrows; + import static org.junit.jupiter.api.Assertions.assertTrue; + + import java.util.Map; + import java.util.Set; + -import org.apache.accumulo.core.Constants; + import org.apache.accumulo.core.client.Accumulo; + import org.apache.accumulo.core.client.AccumuloClient; + import org.apache.accumulo.core.client.BatchScanner; + import org.apache.accumulo.core.client.Scanner; + import org.apache.accumulo.core.client.ScannerBase; + import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; + import org.apache.accumulo.core.client.TableNotFoundException; ++import org.apache.accumulo.core.client.admin.ResourceGroupOperations; + import org.apache.accumulo.core.clientImpl.ClientContext; + import org.apache.accumulo.core.conf.ClientProperty; + import org.apache.accumulo.core.conf.Property; + import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; -import org.apache.accumulo.core.metadata.MetadataTable; ++import org.apache.accumulo.core.data.ResourceGroupId; ++import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; ++import org.apache.accumulo.core.lock.ServiceLockPaths.ResourceGroupPredicate; ++import org.apache.accumulo.core.metadata.SystemTables; + import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.spi.scan.ScanServerSelector; + import org.apache.accumulo.harness.MiniClusterConfigurationCallback; + import org.apache.accumulo.harness.SharedMiniClusterBase; + import org.apache.accumulo.minicluster.ServerType; + import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; + import org.apache.accumulo.test.util.Wait; -import org.apache.accumulo.tserver.ScanServer; + import org.apache.commons.lang3.exception.ExceptionUtils; + import org.apache.hadoop.conf.Configuration; + import org.apache.thrift.TApplicationException; -import org.apache.zookeeper.ZooKeeper; + import org.junit.jupiter.api.AfterAll; + import org.junit.jupiter.api.BeforeAll; + import org.junit.jupiter.params.ParameterizedTest; + import org.junit.jupiter.params.provider.EnumSource; + + import com.google.common.collect.Iterables; + + public class ScanServerAllowedTablesIT extends SharedMiniClusterBase { + - // @formatter:off - private static final String clientConfiguration = - "["+ - " {"+ - " \"isDefault\": true,"+ - " \"maxBusyTimeout\": \"5m\","+ - " \"busyTimeoutMultiplier\": 8,"+ - " \"scanTypeActivations\": [],"+ - " \"attemptPlans\": ["+ - " {"+ - " \"servers\": \"3\","+ - " \"busyTimeout\": \"33ms\","+ - " \"salt\": \"one\""+ - " },"+ - " {"+ - " \"servers\": \"13\","+ - " \"busyTimeout\": \"33ms\","+ - " \"salt\": \"two\""+ - " },"+ - " {"+ - " \"servers\": \"100%\","+ - " \"busyTimeout\": \"33ms\""+ - " }"+ - " ]"+ - " },"+ - " {"+ - " \"isDefault\": false,"+ - " \"maxBusyTimeout\": \"5m\","+ - " \"busyTimeoutMultiplier\": 8,"+ - " \"group\": \"GROUP1\","+ - " \"scanTypeActivations\": [\"use_group1\"],"+ - " \"attemptPlans\": ["+ - " {"+ - " \"servers\": \"3\","+ - " \"busyTimeout\": \"33ms\","+ - " \"salt\": \"one\""+ - " },"+ - " {"+ - " \"servers\": \"13\","+ - " \"busyTimeout\": \"33ms\","+ - " \"salt\": \"two\""+ - " },"+ - " {"+ - " \"servers\": \"100%\","+ - " \"busyTimeout\": \"33ms\""+ - " }"+ - " ]"+ - " }"+ - "]"; - // @formatter:on ++ private static final String GROUP_NAME = "GROUP1"; ++ private static ResourceGroupId rgid = ResourceGroupId.of(GROUP_NAME); + + public static class SSATITConfiguration implements MiniClusterConfigurationCallback { + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + - cfg.setNumScanServers(1); - - // allow the ScanServer in the DEFAULT group to only scan tables in accumulo namespace - cfg.setProperty(Property.SSERV_SCAN_ALLOWED_TABLES.getKey() - + ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME, "^accumulo\\..*$"); - // allow the ScanServer in the GROUP1 group to only scan tables created with the prefix 'test' - cfg.setProperty(Property.SSERV_SCAN_ALLOWED_TABLES.getKey() + "GROUP1", "^test.*"); ++ cfg.getClusterServerConfiguration().setNumDefaultScanServers(1); ++ cfg.getClusterServerConfiguration().addScanServerResourceGroup("GROUP1", 1); + + cfg.setClientProperty(ClientProperty.SCAN_SERVER_SELECTOR_OPTS_PREFIX.getKey() + "profiles", - clientConfiguration); ++ ScanServerGroupConfigurationIT.clientConfiguration); + } + + } + + @BeforeAll + public static void start() throws Exception { + SharedMiniClusterBase.startMiniClusterWithConfig(new SSATITConfiguration()); + SharedMiniClusterBase.getCluster().getClusterControl().start(ServerType.SCAN_SERVER, + "localhost"); + - String zooRoot = getCluster().getServerContext().getZooKeeperRoot(); - ZooReaderWriter zrw = getCluster().getServerContext().getZooReaderWriter(); - String scanServerRoot = zooRoot + Constants.ZSSERVERS; ++ try (var client = Accumulo.newClient().from(getClientProps()).build()) { ++ @SuppressWarnings("resource") ++ final ClientContext cc = (ClientContext) client; ++ ++ Wait.waitFor(() -> cc.getServerPaths().getScanServer( ++ ResourceGroupPredicate.exact(ResourceGroupId.DEFAULT), AddressSelector.all(), true).size() ++ == 1); ++ Wait.waitFor(() -> cc.getServerPaths() ++ .getScanServer(ResourceGroupPredicate.exact(rgid), AddressSelector.all(), true).size() ++ == 1); ++ ++ final ResourceGroupOperations rgOps = client.resourceGroupOperations(); + - while (zrw.getChildren(scanServerRoot).size() == 0) { - Thread.sleep(500); ++ rgOps.setProperty(ResourceGroupId.DEFAULT, Property.SSERV_SCAN_ALLOWED_TABLES.getKey(), ++ "^accumulo\\..*$"); ++ ++ rgOps.create(rgid); ++ rgOps.setProperty(rgid, Property.SSERV_SCAN_ALLOWED_TABLES.getKey(), "^test.*"); + } ++ + } + + @AfterAll + public static void stop() throws Exception { + SharedMiniClusterBase.stopMiniCluster(); + } + + public static enum ScannerType { + BATCH_SCANNER, SCANNER; + } + + private ScannerBase createScanner(AccumuloClient client, ScannerType stype, String tableName) + throws TableNotFoundException { + switch (stype) { + case BATCH_SCANNER: + BatchScanner batchScanner = client.createBatchScanner(tableName, Authorizations.EMPTY); + batchScanner.setRanges(Set.of(new Range())); + return batchScanner; + case SCANNER: + Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY); + scanner.setRange(new Range()); + return scanner; + default: + throw new IllegalArgumentException("Unknown scanner type: " + stype); + } + } + + @SuppressWarnings("unused") + @ParameterizedTest + @EnumSource(value = ScannerType.class) + public void testAllowedTables(ScannerType stype) throws Exception { + - final String zooRoot = getCluster().getServerContext().getZooKeeperRoot(); - final ZooKeeper zk = getCluster().getServerContext().getZooReaderWriter().getZooKeeper(); - final String scanServerRoot = zooRoot + Constants.ZSSERVERS; - + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + - // Start the 2nd ScanServer - // Bump the number of scan serves that can run to start the GROUP1 scan server - getCluster().getConfig().setNumScanServers(2); - getCluster()._exec(ScanServer.class, ServerType.SCAN_SERVER, Map.of(), - new String[] {"-g", "GROUP1"}); - Wait.waitFor(() -> zk.getChildren(scanServerRoot, false).size() == 2); - Wait.waitFor(() -> ((ClientContext) client).getScanServers().values().stream().anyMatch( - (p) -> p.getSecond().equals(ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME)) == true); - Wait.waitFor(() -> ((ClientContext) client).getScanServers().values().stream() - .anyMatch((p) -> p.getSecond().equals("GROUP1")) == true); ++ final ResourceGroupOperations rgOps = client.resourceGroupOperations(); + + // Create table with test prefix, load some data + final String testTableName = "testAllowedTables" + stype.name(); + final int ingestedEntryCount = + ScanServerIT.createTableAndIngest(client, testTableName, null, 10, 10, "colf"); + assertEquals(100, ingestedEntryCount); + + // Using default ScanServer should succeed, only allowed to scan system tables - try (ScannerBase scanner = createScanner(client, stype, MetadataTable.NAME)) { ++ try (ScannerBase scanner = createScanner(client, stype, SystemTables.METADATA.tableName())) { + scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); + assertTrue(Iterables.size(scanner) > 0); + } + + // Using default ScanServer should fail, only allowed to scan system tables + try (ScannerBase scanner = createScanner(client, stype, testTableName)) { + scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); + RuntimeException re = assertThrows(RuntimeException.class, () -> Iterables.size(scanner)); + Throwable root = ExceptionUtils.getRootCause(re); + assertTrue(root instanceof TApplicationException); + TApplicationException tae = (TApplicationException) root; + assertEquals(TApplicationException.INTERNAL_ERROR, tae.getType()); + assertTrue(tae.getMessage().contains("disallowed by property")); + } + + // Using GROUP1 ScanServer should fail, only allowed to test tables - try (ScannerBase scanner = createScanner(client, stype, MetadataTable.NAME)) { ++ try (ScannerBase scanner = createScanner(client, stype, SystemTables.METADATA.tableName())) { + scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); + scanner.setExecutionHints(Map.of("scan_type", "use_group1")); + RuntimeException re = assertThrows(RuntimeException.class, () -> Iterables.size(scanner)); + Throwable root = ExceptionUtils.getRootCause(re); + assertTrue(root instanceof TApplicationException); + TApplicationException tae = (TApplicationException) root; + assertEquals(TApplicationException.INTERNAL_ERROR, tae.getType()); + assertTrue(tae.getMessage().contains("disallowed by property")); + } + + // Using GROUP1 ScanServer should succeed + try (ScannerBase scanner = createScanner(client, stype, testTableName)) { + scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); + scanner.setExecutionHints(Map.of("scan_type", "use_group1")); + assertEquals(100, Iterables.size(scanner)); + } + + // Change the GROUP1 property so that subsequent test tables don't work - getCluster().getServerContext().instanceOperations() - .setProperty(Property.SSERV_SCAN_ALLOWED_TABLES.getKey() + "GROUP1", "^foo.*"); ++ rgOps.setProperty(rgid, Property.SSERV_SCAN_ALLOWED_TABLES.getKey(), "^foo.*"); + + // Using GROUP1 ScanServer should fail, only allowed to test 'test*' tables - try (ScannerBase scanner = createScanner(client, stype, MetadataTable.NAME)) { ++ try (ScannerBase scanner = createScanner(client, stype, SystemTables.METADATA.tableName())) { + scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); + scanner.setExecutionHints(Map.of("scan_type", "use_group1")); + RuntimeException re = assertThrows(RuntimeException.class, () -> Iterables.size(scanner)); + Throwable root = ExceptionUtils.getRootCause(re); + assertTrue(root instanceof TApplicationException); + TApplicationException tae = (TApplicationException) root; + assertEquals(TApplicationException.INTERNAL_ERROR, tae.getType()); + assertTrue(tae.getMessage().contains("disallowed by property")); + } + + // Using GROUP1 ScanServer should fail as the property was changed + try (ScannerBase scanner = createScanner(client, stype, testTableName)) { + scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); + scanner.setExecutionHints(Map.of("scan_type", "use_group1")); + // Try multiple times waiting for the server to pick up the property change + Wait.waitFor(() -> { + try { + var unused = Iterables.size(scanner); + return false; + } catch (RuntimeException e) { + return true; + } + }); + } + + // Change the GROUP1 property so that subsequent test tables do work - getCluster().getServerContext().instanceOperations() - .setProperty(Property.SSERV_SCAN_ALLOWED_TABLES.getKey() + "GROUP1", "^test.*"); ++ rgOps.setProperty(rgid, Property.SSERV_SCAN_ALLOWED_TABLES.getKey(), "^test.*"); + + // Using GROUP1 ScanServer should succeed as the property was changed back + try (ScannerBase scanner = createScanner(client, stype, testTableName)) { + scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); + scanner.setExecutionHints(Map.of("scan_type", "use_group1")); + // Try multiple times waiting for the server to pick up the property change + Wait.waitFor(() -> { + try { + int size = Iterables.size(scanner); + return size == 100; + } catch (RuntimeException e) { + return false; + } + }); + + } + + } + + } + + } diff --cc test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java index b5ea2498e2,c18e6e1aff..725b098886 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java @@@ -46,7 -48,9 +46,7 @@@ import org.junit.jupiter.api.Test public class ScanServerGroupConfigurationIT extends SharedMiniClusterBase { // @formatter:off -- private static final String clientConfiguration = ++ public static final String clientConfiguration = "["+ " {"+ " \"isDefault\": true,"+ @@@ -161,24 -163,19 +161,24 @@@ ScanServerIT.ingest(client, tableName, 10, 10, 10, "colf", true); assertEquals(100, additionalIngest1); - // A a scan server for resource group GROUP1 - // Bump the number of scan serves that can run to start the GROUP1 scan server - getCluster().getConfig().setNumScanServers(2); - getCluster()._exec(ScanServer.class, ServerType.SCAN_SERVER, Map.of(), - new String[] {"-g", "GROUP1"}); - Wait.waitFor(() -> zk.getChildren(scanServerRoot, false).size() == 2); - Wait.waitFor(() -> ((ClientContext) client).getScanServers().values().stream().anyMatch( - (p) -> p.getSecond().equals(ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME)) - == true); - Wait.waitFor(() -> ((ClientContext) client).getScanServers().values().stream() - .anyMatch((p) -> p.getSecond().equals("GROUP1")) == true); ++ // Add a scan server for resource group GROUP1 + getCluster().getConfig().getClusterServerConfiguration() + .addScanServerResourceGroup("GROUP1", 1); + getCluster().getClusterControl().start(ServerType.SCAN_SERVER); + Wait.waitFor( + () -> getCluster().getServerContext().getServerPaths() + .getScanServer(ResourceGroupPredicate.ANY, AddressSelector.all(), true).size() == 2, + 30_000); + Wait.waitFor(() -> getCluster().getServerContext().getServerPaths() + .getScanServer(ResourceGroupPredicate.DEFAULT_RG_ONLY, AddressSelector.all(), true) + .size() == 1); + Wait.waitFor(() -> getCluster().getServerContext().getServerPaths() + .getScanServer(ResourceGroupPredicate.exact(ResourceGroupId.of("GROUP1")), + AddressSelector.all(), true) + .size() == 1); scanner.setExecutionHints(Map.of("scan_type", "use_group1")); - assertEquals(ingestedEntryCount + additionalIngest1, Iterables.size(scanner), + assertEquals(ingestedEntryCount + additionalIngest1, scanner.stream().count(), "The scan server scanner should have seen all ingested and flushed entries"); // if scanning against tserver would see the following, but should not on scan server
