This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new f527270c90 Fixed ScanServerGroupConfigurationIT (#6176)
f527270c90 is described below
commit f527270c9087501a8cd5b15b5e3e459c5c5d5f3b
Author: Dave Marion <[email protected]>
AuthorDate: Thu Mar 5 14:51:18 2026 -0500
Fixed ScanServerGroupConfigurationIT (#6176)
ScanServerGroupConfigurationIT was failing because the
ConfigurableScanServerSelector only checks for new ScanServers
at 5 second intervals and the test was doing a lookup too
quickly. The fix for the test is to just wait 5 seconds.
This change includes adding trace logging into the
ConfigurableScanServerSelector for better debugging
and formatting some of the large JSON blocks using
Java text blocks.
Closes #6154
---
.../accumulo/core/clientImpl/ThriftScanner.java | 9 ++--
.../spi/scan/ConfigurableScanServerSelector.java | 59 +++++++++++++++++++---
.../test/ScanServerGroupConfigurationIT.java | 4 ++
test/src/main/resources/log4j2-test.properties | 3 ++
4 files changed, 65 insertions(+), 10 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
index c45b9322dd..4e07195355 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
@@ -448,7 +448,7 @@ public final class ThriftScanner {
delay = actions.getDelay();
scanState.busyTimeout = Duration.ZERO;
log.trace("For tablet {} scan server selector chose tablet_server:
{}", loc.getExtent(),
- addr);
+ addr.serverAddress);
} else {
log.trace(
"For tablet {} scan server selector chose tablet_server, but the
tablet is not currently hosted",
@@ -873,9 +873,10 @@ public final class ThriftScanner {
+ addr.getExtent().tableId());
if (log.isTraceEnabled()) {
- String msg = "Starting scan server=" + addr.serverAddress + "
tablet=" + addr.getExtent()
- + " range=" + scanState.range + " ssil=" +
scanState.serverSideIteratorList + " ssio="
- + scanState.serverSideIteratorOptions + " context=" +
scanState.classLoaderContext;
+ String msg = "Starting scan server=" + addr.serverAddress + " type="
+ addr.serverType
+ + " tablet=" + addr.getExtent() + " range=" + scanState.range +
" ssil="
+ + scanState.serverSideIteratorList + " ssio=" +
scanState.serverSideIteratorOptions
+ + " context=" + scanState.classLoaderContext;
log.trace("tid={} {}", Thread.currentThread().getId(), msg);
timer = Timer.startNew();
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java
b/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java
index 3b4c05840b..0f1bee6f11 100644
---
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java
+++
b/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java
@@ -40,6 +40,8 @@ import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.data.ResourceGroupId;
import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.util.Timer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
@@ -173,11 +175,35 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
*/
public class ConfigurableScanServerSelector implements ScanServerSelector {
- public static final String PROFILES_DEFAULT =
"[{'isDefault':true,'maxBusyTimeout':'5m',"
- + "'busyTimeoutMultiplier':8, 'scanTypeActivations':[], "
- + "'attemptPlans':[{'servers':'3', 'busyTimeout':'33ms', 'salt':'one'},"
- + "{'servers':'13', 'busyTimeout':'33ms', 'salt':'two'},"
- + "{'servers':'100%', 'busyTimeout':'33ms'}]}]";
+ private static final Logger LOG =
LoggerFactory.getLogger(ConfigurableScanServerSelector.class);
+
+ public static final String PROFILES_DEFAULT = """
+ [
+ {
+ "isDefault": true,
+ "maxBusyTimeout": "5m",
+ "busyTimeoutMultiplier": 8,
+ "scanTypeActivations": [
+ ],
+ "attemptPlans": [
+ {
+ "servers": "3",
+ "busyTimeout": "33ms",
+ "salt": "one"
+ },
+ {
+ "servers": "13",
+ "busyTimeout": "33ms",
+ "salt": "two"
+ },
+ {
+ "servers": "100%",
+ "busyTimeout": "33ms"
+ }
+ ]
+ }
+ ]
+ """;
private Supplier<Collection<ScanServerInfo>> serverSupplier;
private Map<String,Profile> profiles;
@@ -237,6 +263,11 @@ public class ConfigurableScanServerSelector implements
ScanServerSelector {
return parsedBusyTimeout;
}
+ @Override
+ public String toString() {
+ return "AttemptPlan [servers=" + servers + ", busyTimeout=" +
busyTimeout + ", salt=" + salt
+ + "]";
+ }
}
@SuppressFBWarnings(value = {"NP_UNWRITTEN_PUBLIC_OR_PROTECTED_FIELD",
"UWF_UNWRITTEN_FIELD"},
@@ -300,6 +331,15 @@ public class ConfigurableScanServerSelector implements
ScanServerSelector {
List<AttemptPlan> getAttemptPlans() {
return attemptPlans;
}
+
+ @Override
+ public String toString() {
+ return "Profile [attemptPlans=" + attemptPlans + ", scanTypeActivations="
+ + scanTypeActivations + ", isDefault=" + isDefault + ",
busyTimeoutMultiplier="
+ + busyTimeoutMultiplier + ", maxBusyTimeout=" + maxBusyTimeout + ",
group=" + group
+ + ", timeToWaitForScanServers=" + timeToWaitForScanServers + "]";
+ }
+
}
private void parseProfiles(Map<String,String> options) {
@@ -371,7 +411,9 @@ public class ConfigurableScanServerSelector implements
ScanServerSelector {
Preconditions.checkArgument(diff.isEmpty(), "Unknown options %s", diff);
- parseProfiles(params.getOptions());
+ parseProfiles(opts);
+
+ LOG.trace("init, default profile = {}, other profiles: {}",
defaultProfile, profiles);
}
@Override
@@ -383,7 +425,9 @@ public class ConfigurableScanServerSelector implements
ScanServerSelector {
if (scanType != null) {
profile = profiles.getOrDefault(scanType, defaultProfile);
+ LOG.trace("Found profile for scan type {}: {}", scanType, profile);
} else {
+ LOG.trace("scan_type not set, using default profile");
profile = defaultProfile;
}
@@ -411,6 +455,8 @@ public class ConfigurableScanServerSelector implements
ScanServerSelector {
if
(rhasher.getSnapshot().getServersForGroup(profile.getGroupId()).isEmpty()) {
// there are no scan servers so fall back to the tablet server
+ LOG.trace("No scan servers for group {}, falling back to tablet servers",
+ profile.getGroupId());
return new ScanServerSelections() {
@Override
public String getScanServer(TabletId tabletId) {
@@ -435,6 +481,7 @@ public class ConfigurableScanServerSelector implements
ScanServerSelector {
Duration busyTO = Duration.ofMillis(profile.getBusyTimeout(maxAttempts));
+ LOG.trace("Returning servers to use: {}", serversToUse);
return new ScanServerSelections() {
@Override
public String getScanServer(TabletId tabletId) {
diff --git
a/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java
b/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java
index e2ebe4b0db..3fcbaa391a 100644
---
a/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java
@@ -178,6 +178,10 @@ public class ScanServerGroupConfigurationIT extends
SharedMiniClusterBase {
AddressSelector.all(), true)
.size() == 1);
+ // ConfigurableScanServerSelector will only look for new scan servers
+ // after 5 seconds since it looked the last time to reduce the load
+ // on ZooKeeper.
+ Thread.sleep(5_000);
scanner.setExecutionHints(Map.of("scan_type", "use_group1"));
assertEquals(ingestedEntryCount + additionalIngest1,
scanner.stream().count(),
"The scan server scanner should have seen all ingested and flushed
entries");
diff --git a/test/src/main/resources/log4j2-test.properties
b/test/src/main/resources/log4j2-test.properties
index 3cd1751362..a43bdd6346 100644
--- a/test/src/main/resources/log4j2-test.properties
+++ b/test/src/main/resources/log4j2-test.properties
@@ -160,5 +160,8 @@ logger.44.level = trace
logger.45.name = org.apache.accumulo.server.util.checkCommand.CheckRunner
logger.45.level = trace
+logger.46.name = org.apache.accumulo.core.spi.scan
+logger.46.level = info
+
rootLogger.level = debug
rootLogger.appenderRef.console.ref = STDOUT