This is an automated email from the ASF dual-hosted git repository.
krathbun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 4229f84376 FATE config changes (#5861)
4229f84376 is described below
commit 4229f843767492bcaaa029c3b227f026f8c729c8
Author: Kevin Rathbun <[email protected]>
AuthorDate: Fri Sep 5 17:03:01 2025 -0400
FATE config changes (#5861)
* FATE config changes
changes the FATE config structure
From:
* being a JSON where each key is a comma-separated list of FATE operations
and each value is a pool size for those operations
To:
* being a JSON where each key is some user-defined name and each value is a
JSON with a single key/value where the key is a comma-separated list of FATE
operations and the value is a pool size for those operations
For example:
* {"SOME_OPS": 4, "REMAINING_OPS": 10}
Could become:
* {"pool1": {"SOME_OPS": 4}, "pool2": {"REMAINING_OPS": 10}}
This was done to facilitate some way to maintain metrics for these pools
without including the (potentially very long) list of FATE operations, which
could be an issue for some monitoring systems. Also shortens some log messages.
Instead of the FateExecutor objects only being identifiable by their assigned
FATE ops, they are now given the name as defined in the config which can be
used to identify them as well.
Previously:
* metrics tags would include all the FATE operations for that pool, but
these long metrics could be an issue for some monitoring systems
* the pool name would be "accumulo.pool.manager.fate.[user/meta].[THE OPS]"
* the work finder for the pool would be named
"fate.work.finder.[user/meta].[THE OPS]"
Now:
* metrics tags include the name instead of the entire list of operations
(tag looks like "accumulo.pool.manager.fate.[user/meta].[THE NAME]")
* the pool name is "accumulo.pool.manager.fate.[user/meta].[THE NAME]"
* the work finder thread for the pool is named
"fate.work.finder.[user/meta].[THE NAME]"
closes #5836
---
.../org/apache/accumulo/core/conf/Property.java | 40 ++--
.../apache/accumulo/core/conf/PropertyType.java | 59 ++++--
.../java/org/apache/accumulo/core/fate/Fate.java | 44 ++--
.../apache/accumulo/core/fate/FateExecutor.java | 35 +--
.../accumulo/core/fate/FateExecutorMetrics.java | 26 +--
.../org/apache/accumulo/core/metrics/Metric.java | 8 +-
.../accumulo/core/conf/PropertyTypeTest.java | 101 ++++-----
.../test/fate/FateExecutionOrderITBase.java | 4 +-
.../org/apache/accumulo/test/fate/FateITBase.java | 4 +-
.../accumulo/test/fate/FatePoolsWatcherITBase.java | 236 +++++++++++++--------
.../apache/accumulo/test/fate/FateTestUtil.java | 24 ++-
.../org/apache/accumulo/test/fate/FlakyFate.java | 10 +-
.../accumulo/test/fate/MultipleStoresITBase.java | 4 +-
.../apache/accumulo/test/fate/SlowFateSplit.java | 8 +-
.../apache/accumulo/test/metrics/MetricsIT.java | 88 ++++----
15 files changed, 386 insertions(+), 305 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index d3c60e8508..adcfb080f3 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -457,30 +457,30 @@ public enum Property {
PropertyType.TIMEDURATION, "Limit calls from metric sinks to zookeeper
to update interval.",
"1.9.3"),
MANAGER_FATE_USER_CONFIG("manager.fate.user.config",
-
"{\"TABLE_CREATE,TABLE_DELETE,TABLE_RENAME,TABLE_ONLINE,TABLE_OFFLINE,NAMESPACE_CREATE,"
+
"{'general':{'TABLE_CREATE,TABLE_DELETE,TABLE_RENAME,TABLE_ONLINE,TABLE_OFFLINE,NAMESPACE_CREATE,"
+
"NAMESPACE_DELETE,NAMESPACE_RENAME,TABLE_TABLET_AVAILABILITY,SHUTDOWN_TSERVER,"
+
"TABLE_BULK_IMPORT2,TABLE_COMPACT,TABLE_CANCEL_COMPACT,TABLE_MERGE,TABLE_DELETE_RANGE,"
- + "TABLE_SPLIT,TABLE_CLONE,TABLE_IMPORT,TABLE_EXPORT,SYSTEM_MERGE\":
4,"
- + "\"COMMIT_COMPACTION\": 4,\"SYSTEM_SPLIT\": 4}",
+ + "TABLE_SPLIT,TABLE_CLONE,TABLE_IMPORT,TABLE_EXPORT,SYSTEM_MERGE':
4}, "
+ + "'commit':{'COMMIT_COMPACTION': 4}, 'split':{'SYSTEM_SPLIT':
4}}".replace("'", "\""),
PropertyType.FATE_USER_CONFIG,
"The number of threads used to run fault-tolerant executions (FATE) on
user"
- + "tables. These are primarily table operations like merge. Each
key/value "
- + "of the provided JSON corresponds to one thread pool. Each key is
a list of one or "
- + "more FATE operations and each value is the number of threads that
will be assigned "
- + "to the pool.",
+ + "tables. These are primarily table operations like merge. The
property value is JSON. "
+ + "Each key is the name of the pool (can be assigned any string).
Each value is a JSON "
+ + "object (with a single key/value) whose key is a comma-separated
string list of "
+ + "operations and whose value is a pool size for those operations.",
"4.0.0"),
MANAGER_FATE_META_CONFIG("manager.fate.meta.config",
-
"{\"TABLE_CREATE,TABLE_DELETE,TABLE_RENAME,TABLE_ONLINE,TABLE_OFFLINE,NAMESPACE_CREATE,"
+
"{'general':{'TABLE_CREATE,TABLE_DELETE,TABLE_RENAME,TABLE_ONLINE,TABLE_OFFLINE,NAMESPACE_CREATE,"
+
"NAMESPACE_DELETE,NAMESPACE_RENAME,TABLE_TABLET_AVAILABILITY,SHUTDOWN_TSERVER,"
+
"TABLE_BULK_IMPORT2,TABLE_COMPACT,TABLE_CANCEL_COMPACT,TABLE_MERGE,TABLE_DELETE_RANGE,"
- + "TABLE_SPLIT,TABLE_CLONE,TABLE_IMPORT,TABLE_EXPORT,SYSTEM_MERGE\":
4,"
- + "\"COMMIT_COMPACTION\": 4,\"SYSTEM_SPLIT\": 4}",
+ + "TABLE_SPLIT,TABLE_CLONE,TABLE_IMPORT,TABLE_EXPORT,SYSTEM_MERGE':
4}, "
+ + "'commit':{'COMMIT_COMPACTION': 4}, 'split':{'SYSTEM_SPLIT':
4}}".replace("'", "\""),
PropertyType.FATE_META_CONFIG,
- "The number of threads used to run fault-tolerant executions (FATE) on
Accumulo"
- + "system tables. These are primarily table operations like merge.
Each key/value "
- + "of the provided JSON corresponds to one thread pool. Each key is
a list of one or "
- + "more FATE operations and each value is the number of threads that
will be assigned "
- + "to the pool.",
+ "The number of threads used to run fault-tolerant executions (FATE) on
Accumulo system"
+ + "tables. These are primarily table operations like merge. The
property value is JSON. "
+ + "Each key is the name of the pool (can be assigned any string).
Each value is a JSON "
+ + "object (with a single key/value) whose key is a comma-separated
string list of "
+ + "operations and whose value is a pool size for those operations.",
"4.0.0"),
@Deprecated(since = "4.0.0")
MANAGER_FATE_THREADPOOL_SIZE("manager.fate.threadpool.size", "64",
@@ -493,10 +493,12 @@ public enum Property {
"1.4.3"),
MANAGER_FATE_IDLE_CHECK_INTERVAL("manager.fate.idle.check.interval", "60m",
PropertyType.TIMEDURATION,
- "The interval at which to check if the number of idle Fate threads has
consistently been zero."
- + " The way this is checked is an approximation. Logs a warning in
the Manager log to change"
- + " MANAGER_FATE_USER_CONFIG or MANAGER_FATE_META_CONFIG. A value
less than a minute disables"
- + " this check and has a maximum value of 60m.",
+ String.format(
+ "The interval at which to check if the number of idle Fate threads
has consistently been"
+ + " zero. The way this is checked is an approximation. Logs a
warning in the Manager"
+ + " log to change %s or %s. A value less than a minute disables
this check and has a"
+ + " maximum value of 60m.",
+ MANAGER_FATE_USER_CONFIG.getKey(),
MANAGER_FATE_META_CONFIG.getKey()),
"4.0.0"),
MANAGER_STATUS_THREAD_POOL_SIZE("manager.status.threadpool.size", "0",
PropertyType.COUNT,
"The number of threads to use when fetching the tablet server status for
balancing. Zero "
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
index 58962a8048..1af98f7b72 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
@@ -29,6 +29,7 @@ import java.util.function.Function;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@@ -484,44 +485,66 @@ public enum PropertyType {
@Override
public boolean test(String s) {
- final Set<Fate.FateOperation> seenFateOps;
+ final Set<Fate.FateOperation> seenFateOps = new HashSet<>();
+ final int maxPoolNameLen = 64;
try {
final var json = JsonParser.parseString(s).getAsJsonObject();
- seenFateOps = new HashSet<>();
for (var entry : json.entrySet()) {
- var key = entry.getKey();
- var val = entry.getValue().getAsInt();
- if (val <= 0) {
+ var poolName = entry.getKey();
+
+ if (poolName.length() > maxPoolNameLen) {
+ log.warn(
+ "Unexpected property value {} for {}. Configured name {} is
too long (> {} characters). Property was unchanged",
+ s, name, poolName, maxPoolNameLen);
+ return false;
+ }
+
+ var poolConfigSet = entry.getValue().getAsJsonObject().entrySet();
+ if (poolConfigSet.size() != 1) {
+ log.warn(
+ "Unexpected property value {} for {}. Expected one entry for
{} but saw {}. Property was unchanged",
+ s, name, poolName, poolConfigSet.size());
+ return false;
+ }
+
+ var poolConfig = poolConfigSet.iterator().next();
+
+ var poolSize = poolConfig.getValue().getAsInt();
+ if (poolSize <= 0) {
log.warn(
- "Invalid entry {} in {}. Must be a valid thread pool size.
Property was unchanged.",
- entry, name);
+ "Unexpected property value {} for {}. Must be a valid thread
pool size (>0), saw {}. Property was unchanged",
+ s, name, poolSize);
return false;
}
- var fateOpsStrArr = key.split(",");
+
+ var fateOpsStrArr = poolConfig.getKey().split(",");
for (String fateOpStr : fateOpsStrArr) {
Fate.FateOperation fateOp = Fate.FateOperation.valueOf(fateOpStr);
- if (seenFateOps.contains(fateOp)) {
- log.warn("Duplicate fate operation {} seen in {}. Property was
unchanged.", fateOp,
- name);
+ if (!seenFateOps.add(fateOp)) {
+ log.warn(
+ "Unexpected property value {} for {}. Duplicate fate
operation {} seen. Property was unchanged",
+ s, name, fateOp);
return false;
}
- seenFateOps.add(fateOp);
}
}
} catch (Exception e) {
- log.warn("Exception from attempting to set {}. Property was
unchanged.", name, e);
+ log.warn("Unexpected property value {} for {}. Exception occurred.
Property was unchanged",
+ s, name, e);
return false;
}
- var allFateOpsSeen = allFateOps.equals(seenFateOps);
- if (!allFateOpsSeen) {
+ if (!allFateOps.equals(seenFateOps)) {
log.warn(
- "Not all fate operations found in {}. Expected to see {} but saw
{}. Property was unchanged.",
- name, allFateOps, seenFateOps);
+ "Unexpected property value {} for {}. Not all fate operations
found. Expected to see {} but saw {}. Property was unchanged",
+ s, name, allFateOps.stream().sorted().collect(Collectors.toList()),
+ seenFateOps.stream().sorted().collect(Collectors.toList()));
+ return false;
}
- return allFateOpsSeen;
+
+ return true;
}
}
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index 87c237f2ad..e928cbd154 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -29,6 +29,7 @@ import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.META_DEAD_RE
import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.USER_DEAD_RESERVATION_CLEANER_POOL;
import java.time.Duration;
+import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
@@ -182,8 +183,10 @@ public class Fate<T> {
while (fateExecutorsIter.hasNext()) {
var fateExecutor = fateExecutorsIter.next();
- // if this fate executors set of fate ops is no longer present in
the config...
- if (!poolConfigs.containsKey(fateExecutor.getFateOps())) {
+ // if this fate executors set of fate ops is no longer present in
the config OR
+ // this fate executor was renamed in the config
+ if (!poolConfigs.containsKey(fateExecutor.getFateOps()) ||
!poolConfigs
+
.get(fateExecutor.getFateOps()).getKey().equals(fateExecutor.getName())) {
if (!fateExecutor.isShutdown()) {
log.debug(
"[{}] The config for {} has changed invalidating {}.
Gracefully shutting down "
@@ -206,14 +209,16 @@ public class Fate<T> {
// config changes have started shutdown or finished shutdown. Now create
any new replacement
// FateExecutors needed
for (var poolConfig : poolConfigs.entrySet()) {
- var configFateOps = poolConfig.getKey();
- var configPoolSize = poolConfig.getValue();
+ Set<FateOperation> fateOps = poolConfig.getKey();
+ Map.Entry<String,Integer> fateExecNameAndPoolSize =
poolConfig.getValue();
+ String fateExecutorName = fateExecNameAndPoolSize.getKey();
+ int poolSize = fateExecNameAndPoolSize.getValue();
synchronized (fateExecutors) {
- if (fateExecutors.stream().map(FateExecutor::getFateOps)
- .noneMatch(fo -> fo.equals(configFateOps))) {
- log.debug("[{}] Adding FateExecutor for {}", store.type(),
configFateOps);
- fateExecutors
- .add(new FateExecutor<>(Fate.this, environment, configFateOps,
configPoolSize));
+ if (fateExecutors.stream().noneMatch(
+ fe -> fe.getFateOps().equals(fateOps) &&
fe.getName().equals(fateExecutorName))) {
+ log.debug("[{}] Adding FateExecutor for {}", store.type(),
fateOps);
+ fateExecutors.add(
+ new FateExecutor<>(Fate.this, environment, fateOps, poolSize,
fateExecutorName));
}
}
}
@@ -276,23 +281,26 @@ public class Fate<T> {
/**
* Returns a map of the current pool configurations as set in the given
config. Each key is a set
- * of fate operations and each value is an integer for the number of threads
assigned to work
- * those fate operations.
+ * of fate operations and each value is a map entry with key = fate executor
name and value = pool
+ * size
*/
@VisibleForTesting
- public static Map<Set<FateOperation>,Integer>
getPoolConfigurations(AccumuloConfiguration conf,
- FateInstanceType type) {
- Map<Set<FateOperation>,Integer> poolConfigs = new HashMap<>();
+ public static Map<Set<FateOperation>,Map.Entry<String,Integer>>
+ getPoolConfigurations(AccumuloConfiguration conf, FateInstanceType type)
{
+ Map<Set<FateOperation>,Map.Entry<String,Integer>> poolConfigs = new
HashMap<>();
final var json =
JsonParser.parseString(conf.get(getFateConfigProp(type))).getAsJsonObject();
for (var entry : json.entrySet()) {
- var key = entry.getKey();
- var val = entry.getValue().getAsInt();
- var fateOpsStrArr = key.split(",");
+ String fateExecutorName = entry.getKey();
+ var poolConfig =
entry.getValue().getAsJsonObject().entrySet().iterator().next();
+ String fateOpsStr = poolConfig.getKey();
+ int poolSize = poolConfig.getValue().getAsInt();
+ String[] fateOpsStrArr = fateOpsStr.split(",");
Set<FateOperation> fateOpsSet =
Arrays.stream(fateOpsStrArr).map(FateOperation::valueOf)
.collect(Collectors.toCollection(TreeSet::new));
- poolConfigs.put(fateOpsSet, val);
+ poolConfigs.put(fateOpsSet,
+ new AbstractMap.SimpleImmutableEntry<>(fateExecutorName, poolSize));
}
return poolConfigs;
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
b/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
index 9710981b09..a912fa5940 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
@@ -43,7 +43,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
import
org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
import org.apache.accumulo.core.conf.Property;
@@ -75,6 +74,7 @@ public class FateExecutor<T> {
private final Thread workFinder;
private final TransferQueue<FateId> workQueue;
private final AtomicInteger idleWorkerCount;
+ private final String name;
private final String poolName;
private final ThreadPoolExecutor transactionExecutor;
private final Set<TransactionRunner> runningTxRunners;
@@ -82,26 +82,26 @@ public class FateExecutor<T> {
private final ConcurrentLinkedQueue<Integer> idleCountHistory = new
ConcurrentLinkedQueue<>();
private final FateExecutorMetrics<T> fateExecutorMetrics;
- public FateExecutor(Fate<T> fate, T environment, Set<Fate.FateOperation>
fateOps, int poolSize) {
+ public FateExecutor(Fate<T> fate, T environment, Set<Fate.FateOperation>
fateOps, int poolSize,
+ String name) {
final FateInstanceType type = fate.getStore().type();
final String typeStr = type.name().toLowerCase();
- final String operatesOn = fateOps.stream().map(fo ->
fo.name().toLowerCase()).sorted()
- .collect(Collectors.joining("."));
- final String transactionRunnerPoolName =
- ThreadPoolNames.MANAGER_FATE_POOL_PREFIX.poolName + typeStr + "." +
operatesOn;
- final String workFinderThreadName = "fate.work.finder." + typeStr + "." +
operatesOn;
+ final String poolName =
+ ThreadPoolNames.MANAGER_FATE_POOL_PREFIX.poolName + typeStr + "." +
name;
+ final String workFinderThreadName = "fate.work.finder." + typeStr + "." +
name;
this.fate = fate;
this.environment = environment;
this.fateOps = Collections.unmodifiableSet(fateOps);
this.workQueue = new LinkedTransferQueue<>();
this.runningTxRunners = Collections.synchronizedSet(new HashSet<>());
- this.poolName = transactionRunnerPoolName;
- this.transactionExecutor = ThreadPools.getServerThreadPools()
-
.getPoolBuilder(transactionRunnerPoolName).numCoreThreads(poolSize).build();
+ this.name = name;
+ this.poolName = poolName;
+ this.transactionExecutor =
ThreadPools.getServerThreadPools().getPoolBuilder(poolName)
+ .numCoreThreads(poolSize).build();
this.idleWorkerCount = new AtomicInteger(0);
this.fateExecutorMetrics =
- new FateExecutorMetrics<>(type, operatesOn, runningTxRunners,
idleWorkerCount);
+ new FateExecutorMetrics<>(type, poolName, runningTxRunners,
idleWorkerCount);
this.workFinder = Threads.createCriticalThread(workFinderThreadName, new
WorkFinder());
this.workFinder.start();
@@ -112,9 +112,10 @@ public class FateExecutor<T> {
* grew, stop TransactionRunners if the pool shrunk, and potentially suggest
resizing the pool if
* the load is consistently high.
*/
- protected void resizeFateExecutor(Map<Set<Fate.FateOperation>,Integer>
poolConfigs,
+ protected void resizeFateExecutor(
+ Map<Set<Fate.FateOperation>,Map.Entry<String,Integer>> poolConfigs,
long idleCheckIntervalMillis) {
- final int configured = poolConfigs.get(fateOps);
+ final int configured = poolConfigs.get(fateOps).getValue();
ThreadPools.resizePool(transactionExecutor, () -> configured, poolName);
synchronized (runningTxRunners) {
final int running = runningTxRunners.size();
@@ -203,6 +204,10 @@ public class FateExecutor<T> {
}
}
+ protected String getName() {
+ return name;
+ }
+
private int getIdleWorkerCount() {
// This could call workQueue.getWaitingConsumerCount() if other code use
poll with timeout
return idleWorkerCount.get();
@@ -608,7 +613,7 @@ public class FateExecutor<T> {
@Override
public String toString() {
- return
String.format("FateExecutor:{FateOps=%s,PoolSize:%s,TransactionRunners:%s}",
fateOps,
- runningTxRunners.size(), runningTxRunners);
+ return
String.format("FateExecutor:{FateOps=%s,Name=%s,PoolSize:%s,TransactionRunners:%s}",
+ fateOps, name, runningTxRunners.size(), runningTxRunners);
}
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/FateExecutorMetrics.java
b/core/src/main/java/org/apache/accumulo/core/fate/FateExecutorMetrics.java
index fec087519f..4edc70fe7a 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateExecutorMetrics.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateExecutorMetrics.java
@@ -32,18 +32,18 @@ import io.micrometer.core.instrument.MeterRegistry;
public class FateExecutorMetrics<T> implements MetricsProducer {
private static final Logger log =
LoggerFactory.getLogger(FateExecutorMetrics.class);
private final FateInstanceType type;
- private final String operatesOn;
+ private final String poolName;
private final Set<FateExecutor<T>.TransactionRunner> runningTxRunners;
private final AtomicInteger idleWorkerCount;
private MeterRegistry registry;
private State state;
public static final String INSTANCE_TYPE_TAG_KEY = "instanceType";
- public static final String OPS_ASSIGNED_TAG_KEY = "ops.assigned";
+ public static final String POOL_NAME_TAG_KEY = "pool.name";
- protected FateExecutorMetrics(FateInstanceType type, String operatesOn,
+ protected FateExecutorMetrics(FateInstanceType type, String poolName,
Set<FateExecutor<T>.TransactionRunner> runningTxRunners, AtomicInteger
idleWorkerCount) {
this.type = type;
- this.operatesOn = operatesOn;
+ this.poolName = poolName;
this.runningTxRunners = runningTxRunners;
this.state = State.UNREGISTERED;
this.idleWorkerCount = idleWorkerCount;
@@ -55,12 +55,12 @@ public class FateExecutorMetrics<T> implements
MetricsProducer {
if (state == State.UNREGISTERED) {
Gauge.builder(Metric.FATE_OPS_THREADS_TOTAL.getName(),
runningTxRunners::size)
.description(Metric.FATE_OPS_THREADS_TOTAL.getDescription())
- .tag(INSTANCE_TYPE_TAG_KEY, type.name().toLowerCase())
- .tag(OPS_ASSIGNED_TAG_KEY, operatesOn).register(registry);
+ .tag(INSTANCE_TYPE_TAG_KEY,
type.name().toLowerCase()).tag(POOL_NAME_TAG_KEY, poolName)
+ .register(registry);
Gauge.builder(Metric.FATE_OPS_THREADS_INACTIVE.getName(),
idleWorkerCount::get)
.description(Metric.FATE_OPS_THREADS_INACTIVE.getDescription())
- .tag(INSTANCE_TYPE_TAG_KEY, type.name().toLowerCase())
- .tag(OPS_ASSIGNED_TAG_KEY, operatesOn).register(registry);
+ .tag(INSTANCE_TYPE_TAG_KEY,
type.name().toLowerCase()).tag(POOL_NAME_TAG_KEY, poolName)
+ .register(registry);
registered(registry);
}
@@ -70,7 +70,7 @@ public class FateExecutorMetrics<T> implements
MetricsProducer {
// noop if metrics were never registered or have already been cleared
if (state == State.REGISTERED) {
var threadsTotalMeter =
registry.find(Metric.FATE_OPS_THREADS_TOTAL.getName())
- .tags(INSTANCE_TYPE_TAG_KEY, type.name().toLowerCase(),
OPS_ASSIGNED_TAG_KEY, operatesOn)
+ .tags(INSTANCE_TYPE_TAG_KEY, type.name().toLowerCase(),
POOL_NAME_TAG_KEY, poolName)
.meter();
// meter will be null if it could not be found, ignore IDE warning if
one is seen
if (threadsTotalMeter == null) {
@@ -78,21 +78,21 @@ public class FateExecutorMetrics<T> implements
MetricsProducer {
"Tried removing meter{name: {} tags: {}={}, {}={}} from the
registry, but did "
+ "not find it.",
Metric.FATE_OPS_THREADS_TOTAL.getName(), INSTANCE_TYPE_TAG_KEY,
- type.name().toLowerCase(), OPS_ASSIGNED_TAG_KEY, operatesOn);
+ type.name().toLowerCase(), POOL_NAME_TAG_KEY, poolName);
} else {
registry.remove(threadsTotalMeter);
}
var threadsInactiveMeter =
registry.find(Metric.FATE_OPS_THREADS_INACTIVE.getName())
- .tags(INSTANCE_TYPE_TAG_KEY, type.name().toLowerCase(),
OPS_ASSIGNED_TAG_KEY, operatesOn)
+ .tags(INSTANCE_TYPE_TAG_KEY, type.name().toLowerCase(),
POOL_NAME_TAG_KEY, poolName)
.meter();
// meter will be null if it could not be found, ignore IDE warning if
one is seen
if (threadsInactiveMeter == null) {
log.error(
"Tried removing meter{name: {} tags: {}={}, {}={}} from the
registry, but did "
+ "not find it.",
- Metric.FATE_OPS_THREADS_TOTAL.getName(), INSTANCE_TYPE_TAG_KEY,
- type.name().toLowerCase(), OPS_ASSIGNED_TAG_KEY, operatesOn);
+ Metric.FATE_OPS_THREADS_INACTIVE.getName(), INSTANCE_TYPE_TAG_KEY,
+ type.name().toLowerCase(), POOL_NAME_TAG_KEY, poolName);
} else {
registry.remove(threadsInactiveMeter);
}
diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
b/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
index 4876c6a160..8fefa5d76a 100644
--- a/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
+++ b/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
@@ -110,14 +110,14 @@ public enum Metric {
+ "(e.g., state=new, state=in.progress, state=failed, etc.).",
MetricDocSection.FATE),
FATE_OPS_THREADS_INACTIVE("accumulo.fate.ops.threads.inactive",
MetricType.GAUGE,
- "Keeps track of the number of idle threads (not working on a fate
operation) in the thread pool assigned to work on the operations as shown in
the "
- + FateExecutorMetrics.OPS_ASSIGNED_TAG_KEY
+ "Keeps track of the number of idle threads (not working on a fate
operation) in the thread "
+ + "pool. The pool name can be found in the " +
FateExecutorMetrics.POOL_NAME_TAG_KEY
+ " tag. The fate instance type can be found in the "
+ FateExecutorMetrics.INSTANCE_TYPE_TAG_KEY + " tag.",
MetricDocSection.FATE),
FATE_OPS_THREADS_TOTAL("accumulo.fate.ops.threads.total", MetricType.GAUGE,
- "Keeps track of the total number of threads in the thread pool assigned
to work on the operations as shown in the "
- + FateExecutorMetrics.OPS_ASSIGNED_TAG_KEY
+ "Keeps track of the total number of threads in the thread pool. The pool
name can be found in the "
+ + FateExecutorMetrics.POOL_NAME_TAG_KEY
+ " tag. The fate instance type can be found in the "
+ FateExecutorMetrics.INSTANCE_TYPE_TAG_KEY + " tag.",
MetricDocSection.FATE),
diff --git
a/core/src/test/java/org/apache/accumulo/core/conf/PropertyTypeTest.java
b/core/src/test/java/org/apache/accumulo/core/conf/PropertyTypeTest.java
index 6343aefbe9..86a1037f85 100644
--- a/core/src/test/java/org/apache/accumulo/core/conf/PropertyTypeTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/conf/PropertyTypeTest.java
@@ -238,77 +238,56 @@ public class PropertyTypeTest extends WithTestNames {
@Test
public void testTypeFATE_USER_CONFIG() {
- var allUserFateOps = Fate.FateOperation.getAllUserFateOps();
- int poolSize1 = allUserFateOps.size() / 2;
- var validPool1Ops =
-
allUserFateOps.stream().map(Enum::name).limit(poolSize1).collect(Collectors.joining(","));
- var validPool2Ops =
-
allUserFateOps.stream().map(Enum::name).skip(poolSize1).collect(Collectors.joining(","));
- // should be valid: one pool for all ops, order should not matter, all ops
split across
- // multiple pools (note validated in the same order as described here)
- valid(
- "{\"" +
allUserFateOps.stream().map(Enum::name).collect(Collectors.joining(","))
- + "\": 10}",
- "{\"" + validPool2Ops + "," + validPool1Ops + "\": 10}",
- "{\"" + validPool1Ops + "\": 2, \"" + validPool2Ops + "\": 3}");
- // should be invalid: invalid json, null, missing FateOperation, pool size
of 0, pool size of
- // -1, invalid pool size, invalid key, same FateOperation repeated in a
different pool, invalid
- // FateOperation (note validated in the same order as described here)
- var invalidPool1Ops =
-
allUserFateOps.stream().map(Enum::name).limit(poolSize1).collect(Collectors.joining(","));
- var invalidPool2Ops =
allUserFateOps.stream().map(Enum::name).skip(poolSize1 + 1)
- .collect(Collectors.joining(","));
- invalid("", null, "{\"" + invalidPool1Ops + "\": 2, \"" + invalidPool2Ops
+ "\": 3}",
- "{\"" +
allUserFateOps.stream().map(Enum::name).collect(Collectors.joining(",")) + "\":
0}",
- "{\"" +
allUserFateOps.stream().map(Enum::name).collect(Collectors.joining(","))
- + "\": -1}",
- "{\"" +
allUserFateOps.stream().map(Enum::name).collect(Collectors.joining(",")) + "\":
x}",
- "{\"" +
allUserFateOps.stream().map(Enum::name).collect(Collectors.joining(", "))
- + "\": 10}",
- "{\"" +
allUserFateOps.stream().map(Enum::name).collect(Collectors.joining(","))
- + "\": 10, \""
- +
allUserFateOps.stream().map(Enum::name).limit(1).collect(Collectors.joining(","))
- + "\": 10}",
- "{\"" +
allUserFateOps.stream().map(Enum::name).collect(Collectors.joining(","))
- + ",INVALID_FATEOP\": 10}");
+ testFateConfig(Fate.FateOperation.getAllUserFateOps());
}
@Test
public void testTypeFATE_META_CONFIG() {
- var allMetaFateOps = Fate.FateOperation.getAllMetaFateOps();
- int poolSize1 = allMetaFateOps.size() / 2;
- var validPool1Ops =
-
allMetaFateOps.stream().map(Enum::name).limit(poolSize1).collect(Collectors.joining(","));
- var validPool2Ops =
-
allMetaFateOps.stream().map(Enum::name).skip(poolSize1).collect(Collectors.joining(","));
+ testFateConfig(Fate.FateOperation.getAllMetaFateOps());
+ }
+
+ private void testFateConfig(Set<Fate.FateOperation> allOps) {
+ final int poolSize1 = allOps.size() / 2;
+ final var validPool1Ops =
+
allOps.stream().map(Enum::name).limit(poolSize1).collect(Collectors.joining(","));
+ final var validPool2Ops =
+
allOps.stream().map(Enum::name).skip(poolSize1).collect(Collectors.joining(","));
+ final var allFateOpsStr =
allOps.stream().map(Enum::name).collect(Collectors.joining(","));
// should be valid: one pool for all ops, order should not matter, all ops
split across
// multiple pools (note validated in the same order as described here)
- valid(
- "{\"" +
allMetaFateOps.stream().map(Enum::name).collect(Collectors.joining(","))
- + "\": 10}",
- "{\"" + validPool2Ops + "," + validPool1Ops + "\": 10}",
- "{\"" + validPool1Ops + "\": 2, \"" + validPool2Ops + "\": 3}");
+ valid(String.format("{'poolname':{'%s': 10}}", allFateOpsStr).replace("'",
"\""),
+ String.format("{'123abc':{'%s,%s': 10}}", validPool2Ops,
validPool1Ops).replace("'", "\""),
+ String.format("{'foo':{'%s': 2}, 'bar':{'%s': 3}}", validPool1Ops,
validPool2Ops)
+ .replace("'", "\""));
var invalidPool1Ops =
-
allMetaFateOps.stream().map(Enum::name).limit(poolSize1).collect(Collectors.joining(","));
- var invalidPool2Ops =
allMetaFateOps.stream().map(Enum::name).skip(poolSize1 + 1)
- .collect(Collectors.joining(","));
+
allOps.stream().map(Enum::name).limit(poolSize1).collect(Collectors.joining(","));
+ var invalidPool2Ops =
+ allOps.stream().map(Enum::name).skip(poolSize1 +
1).collect(Collectors.joining(","));
// should be invalid: invalid json, null, missing FateOperation, pool size
of 0, pool size of
// -1, invalid pool size, invalid key, same FateOperation repeated in a
different pool, invalid
- // FateOperation (note validated in the same order as described here)
- invalid("", null, "{\"" + invalidPool1Ops + "\": 2, \"" + invalidPool2Ops
+ "\": 3}",
- "{\"" +
allMetaFateOps.stream().map(Enum::name).collect(Collectors.joining(",")) + "\":
0}",
- "{\"" +
allMetaFateOps.stream().map(Enum::name).collect(Collectors.joining(","))
- + "\": -1}",
- "{\"" +
allMetaFateOps.stream().map(Enum::name).collect(Collectors.joining(",")) + "\":
x}",
- "{\"" +
allMetaFateOps.stream().map(Enum::name).collect(Collectors.joining(", "))
- + "\": 10}",
- "{\"" +
allMetaFateOps.stream().map(Enum::name).collect(Collectors.joining(","))
- + "\": 10, \""
- +
allMetaFateOps.stream().map(Enum::name).limit(1).collect(Collectors.joining(","))
- + "\": 10}",
- "{\"" +
allMetaFateOps.stream().map(Enum::name).collect(Collectors.joining(","))
- + ",INVALID_FATEOP\": 10}");
+ // FateOperation, long name, repeated name, more than one key/val for
single pool name
+ // (note validated in the same order as described here)
+ invalid("", null,
+ String.format("{'name1':{'%s': 2}, 'name2':{'%s': 3}}",
invalidPool1Ops, invalidPool2Ops)
+ .replace("'", "\""),
+ String.format("{'foobar':{'%s': 0}}", allFateOpsStr).replace("'",
"\""),
+ String.format("{'foobar':{'%s': -1}}", allFateOpsStr).replace("'",
"\""),
+ String.format("{'foofoofoofoo':{'%s': x}}",
allFateOpsStr).replace("'", "\""),
+ String
+ .format("{'123':{'%s': 10}}",
+ allOps.stream().map(Enum::name).collect(Collectors.joining(",
")))
+ .replace("'", "\""),
+ String
+ .format("{'abc':{'%s': 10}, 'def':{'%s': 10}}", allFateOpsStr,
+
allOps.stream().map(Enum::name).limit(1).collect(Collectors.joining(",")))
+ .replace("'", "\""),
+ String.format("{'xyz':{'%s,INVALID_FATEOP': 10}}",
allFateOpsStr).replace("'", "\""),
+ String.format("{'%s':{'%s': 10}}", "x".repeat(100),
allFateOpsStr).replace("'", "\""),
+ String.format("{'name':{'%s':7}, 'name':{'%s':8}}", validPool1Ops,
validPool2Ops)
+ .replace("'", "\""),
+ String.format("{'xyz123':{'%s':9,'%s':8}}", validPool1Ops,
validPool2Ops).replace("'",
+ "\""));
}
@Test
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/FateExecutionOrderITBase.java
b/test/src/main/java/org/apache/accumulo/test/fate/FateExecutionOrderITBase.java
index e33cad362a..ee3427fc2c 100644
---
a/test/src/main/java/org/apache/accumulo/test/fate/FateExecutionOrderITBase.java
+++
b/test/src/main/java/org/apache/accumulo/test/fate/FateExecutionOrderITBase.java
@@ -47,6 +47,7 @@ import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.TabletAvailability;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
@@ -194,7 +195,8 @@ public abstract class FateExecutionOrderITBase extends
SharedMiniClusterBase
protected Fate<FeoTestEnv> initializeFate(AccumuloClient client,
FateStore<FeoTestEnv> store) {
return new Fate<>(new FeoTestEnv(client), store, false, r -> r + "",
- FateTestUtil.createTestFateConfig(1), new
ScheduledThreadPoolExecutor(2));
+ FateTestUtil.updateFateConfig(new ConfigurationCopy(), 1,
"AllFateOps"),
+ new ScheduledThreadPoolExecutor(2));
}
private static Entry<FateId,String> toIdStep(Entry<Key,Value> e) {
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateITBase.java
b/test/src/main/java/org/apache/accumulo/test/fate/FateITBase.java
index d8393f853c..d965881c17 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateITBase.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateITBase.java
@@ -44,6 +44,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.fate.AbstractFateStore;
import org.apache.accumulo.core.fate.Fate;
import org.apache.accumulo.core.fate.FateId;
@@ -554,7 +555,8 @@ public abstract class FateITBase extends
SharedMiniClusterBase implements FateTe
protected Fate<TestEnv> initializeFate(FateStore<TestEnv> store) {
return new Fate<>(new TestEnv(), store, false, r -> r + "",
- FateTestUtil.createTestFateConfig(1), new
ScheduledThreadPoolExecutor(2));
+ FateTestUtil.updateFateConfig(new ConfigurationCopy(), 1,
"AllFateOps"),
+ new ScheduledThreadPoolExecutor(2));
}
protected abstract TStatus getTxStatus(ServerContext sctx, FateId fateId);
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/FatePoolsWatcherITBase.java
b/test/src/main/java/org/apache/accumulo/test/fate/FatePoolsWatcherITBase.java
index 66b80d41dd..ae24acbfa0 100644
---
a/test/src/main/java/org/apache/accumulo/test/fate/FatePoolsWatcherITBase.java
+++
b/test/src/main/java/org/apache/accumulo/test/fate/FatePoolsWatcherITBase.java
@@ -27,7 +27,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.fate.Fate;
@@ -79,12 +78,12 @@ public abstract class FatePoolsWatcherITBase extends
SharedMiniClusterBase
protected void testIncrease1(FateStore<PoolResizeTestEnv> store,
ServerContext sctx)
throws Exception {
// Tests changing the config for the FATE thread pools from
- // {<half the FATE ops/SET1>}: 4 <-- FateExecutor1
- // {<other half/SET2>}: 5 <-- FateExecutor2
+ // SET1: {<half the FATE ops>: 4} <-- FateExecutor1
+ // SET2: {<other half>: 5} <-- FateExecutor2
// ---->
- // {<half the FATE ops/SET1>}: 10 <-- FateExecutor1
- // {<other half minus one/SET3>}: 9 <-- FateExecutor3
- // {<remaining FATE op/SET4>}: 8 <-- FateExecutor4
+ // SET1: {<half the FATE ops>: 10} <-- FateExecutor1
+ // SET3: {<other half minus one>: 9} <-- FateExecutor3
+ // SET4: {<remaining FATE op>: 8} <-- FateExecutor4
// This tests inc size of FATE thread pools for FateExecutors with
unchanged fate ops, stopping
// FateExecutors that are no longer valid (while ensuring none are stopped
while in progress on
// a transaction), and creating new FateExecutors as needed. Essentially,
FateExecutor1's pool
@@ -215,12 +214,14 @@ public abstract class FatePoolsWatcherITBase extends
SharedMiniClusterBase
protected void testIncrease2(FateStore<PoolResizeTestEnv> store,
ServerContext sctx) {
// Tests changing the config for the FATE thread pools from
- // {<All FATE ops>}: 2 <-- FateExecutor1
+ // AllFateOps: {<All FATE ops>: 2} <-- FateExecutor1
// ---->
- // {<All FATE ops>}: 3 <-- FateExecutor1
+ // AllFateOps: {<All FATE ops>: 3} <-- FateExecutor1
// when 3 transactions need to be worked on. Ensures after the config
change, the third tx
// is picked up.
- final ConfigurationCopy config = FateTestUtil.createTestFateConfig(2);
+ final String fateExecName = "AllFateOps";
+ final ConfigurationCopy config =
+ FateTestUtil.updateFateConfig(new ConfigurationCopy(), 2,
fateExecName);
final var env = new PoolResizeTestEnv();
final Fate<PoolResizeTestEnv> fate = new FastFate<>(env, store, false, r
-> r + "", config);
final int numWorkers = 2;
@@ -249,7 +250,7 @@ public abstract class FatePoolsWatcherITBase extends
SharedMiniClusterBase
assertEquals(numWorkers, fate.getTxRunnersActive(allFateOps));
// increase the pool size
- changeConfigIncTest2(config, newNumWorkers);
+ FateTestUtil.updateFateConfig(config, newNumWorkers, fateExecName);
// wait for the final tx to be picked up
Wait.waitFor(() -> env.numWorkers.get() == newNumWorkers);
@@ -291,12 +292,12 @@ public abstract class FatePoolsWatcherITBase extends
SharedMiniClusterBase
protected void testDecrease(FateStore<PoolResizeTestEnv> store,
ServerContext sctx)
throws Exception {
// Tests changing the config for the FATE thread pools from
- // {<half the FATE ops/SET1>}: 4 <-- FateExecutor1
- // {<other half minus one/SET3>}: 5 <-- FateExecutor2
- // {<remaining FATE op/SET4>}: 6 <-- FateExecutor3
+ // SET1: {<half the FATE ops>: 4} <-- FateExecutor1
+ // SET3: {<other half minus one>: 5} <-- FateExecutor2
+ // SET4: {<remaining FATE op>: 6} <-- FateExecutor3
// ---->
- // {<half the FATE ops/SET1>}: 3 <-- FateExecutor1
- // {<other half/SET2>}: 2 <-- FateExecutor4
+ // SET1: {<half the FATE ops>: 3} <-- FateExecutor1
+ // SET2: {<other half>: 2} <-- FateExecutor4
// This tests dec size of FATE thread pools for FateExecutors with
unchanged fate ops, stopping
// FateExecutors that are no longer valid (while ensuring none are stopped
while in progress on
// a transaction), and creating new FateExecutors as needed. Essentially,
FateExecutor1's pool
@@ -429,7 +430,9 @@ public abstract class FatePoolsWatcherITBase extends
SharedMiniClusterBase
protected void testIdleCountHistory(FateStore<PoolResizeTestEnv> store,
ServerContext sctx)
throws Exception {
// Tests that a warning to increase pool size is logged when expected
- var config = configIdleHistoryTest();
+ var config = FateTestUtil.updateFateConfig(new ConfigurationCopy(), 2,
"AllFateOps");
+ config.set(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL, "1m");
+
final var env = new PoolResizeTestEnv();
final Fate<PoolResizeTestEnv> fate = new FastFate<>(env, store, false, r
-> r + "", config);
try {
@@ -462,15 +465,15 @@ public abstract class FatePoolsWatcherITBase extends
SharedMiniClusterBase
protected void testFatePoolsPartitioning(FateStore<PoolResizeTestEnv> store,
ServerContext sctx)
throws Exception {
// Ensures FATE ops are correctly partitioned between the pools.
Configures 4 FateExecutors:
- // FateExecutor1 with 2 threads operating on 1/4 of FATE ops
- // FateExecutor2 with 3 threads operating on 1/4 of FATE ops
- // FateExecutor3 with 4 threads operating on 1/4 of FATE ops
- // FateExecutor4 with 5 threads operating on 1/4 of FATE ops
+ // pool1/FateExecutor1 with 2 threads operating on 1/4 of FATE ops
+ // pool2/FateExecutor2 with 3 threads operating on 1/4 of FATE ops
+ // pool3/FateExecutor3 with 4 threads operating on 1/4 of FATE ops
+ // pool4/FateExecutor4 with 5 threads operating on 1/4 of FATE ops
// Seeds:
- // 5 transactions on FateExecutor1
- // 6 transactions on FateExecutor2
- // 1 transactions on FateExecutor3
- // 4 transactions on FateExecutor4
+ // 5 transactions on pool1/FateExecutor1
+ // 6 transactions on pool2/FateExecutor2
+ // 1 transactions on pool3/FateExecutor3
+ // 4 transactions on pool4/FateExecutor4
// Ensures that we only see min(configured threads, transactions seeded)
ever running
// Also ensures that FateExecutors do not pick up any work that they
shouldn't
final int numThreadsPool1 = 2;
@@ -512,17 +515,19 @@ public abstract class FatePoolsWatcherITBase extends
SharedMiniClusterBase
final ConfigurationCopy config = new ConfigurationCopy();
config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
config.set(Property.MANAGER_FATE_USER_CONFIG,
- String.format("{\"%s\": %s, \"%s\": %s, \"%s\": %s, \"%s\": %s}",
+
String.format("{'pool1':{'%s':%d},'pool2':{'%s':%d},'pool3':{'%s':%d},'pool4':{'%s':%d}}",
userPool1.stream().map(Enum::name).collect(Collectors.joining(",")),
numThreadsPool1,
userPool2.stream().map(Enum::name).collect(Collectors.joining(",")),
numThreadsPool2,
userPool3.stream().map(Enum::name).collect(Collectors.joining(",")),
numThreadsPool3,
-
userPool4.stream().map(Enum::name).collect(Collectors.joining(",")),
numThreadsPool4));
+
userPool4.stream().map(Enum::name).collect(Collectors.joining(",")),
numThreadsPool4)
+ .replace("'", "\""));
config.set(Property.MANAGER_FATE_META_CONFIG,
- String.format("{\"%s\": %s, \"%s\": %s, \"%s\": %s, \"%s\": %s}",
+
String.format("{'pool1':{'%s':%d},'pool2':{'%s':%d},'pool3':{'%s':%d},'pool4':{'%s':%d}}",
metaPool1.stream().map(Enum::name).collect(Collectors.joining(",")),
numThreadsPool1,
metaPool2.stream().map(Enum::name).collect(Collectors.joining(",")),
numThreadsPool2,
metaPool3.stream().map(Enum::name).collect(Collectors.joining(",")),
numThreadsPool3,
-
metaPool4.stream().map(Enum::name).collect(Collectors.joining(",")),
numThreadsPool4));
+
metaPool4.stream().map(Enum::name).collect(Collectors.joining(",")),
numThreadsPool4)
+ .replace("'", "\""));
config.set(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL, "60m");
final boolean isUserStore = store.type() == FateInstanceType.USER;
@@ -629,84 +634,139 @@ public abstract class FatePoolsWatcherITBase extends
SharedMiniClusterBase
}
}
+ @Test
+ public void testFateExecutorRename() throws Exception {
+ executeTest(this::testFateExecutorRename);
+ }
+
+ protected void testFateExecutorRename(FateStore<PoolResizeTestEnv> store,
ServerContext sctx)
+ throws Exception {
+ // tests that attempting to rename a fate executor will cause it to be
shutdown and a new one
+ // to be started
+
+ final var env = new PoolResizeTestEnv();
+ final int poolSize = 3;
+ final int newPoolSize = 5;
+ final var config =
+ FateTestUtil.updateFateConfig(new ConfigurationCopy(), poolSize,
"AllFateOps");
+ final Fate<PoolResizeTestEnv> fate = new FastFate<>(env, store, false, r
-> r + "", config);
+
+ try {
+ // start a single transaction
+ fate.seedTransaction(FateTestUtil.TEST_FATE_OP, fate.startTransaction(),
+ new PoolResizeTestRepo(), true, "testing");
+
+ // wait for the transaction to be worked on
+ Wait.waitFor(() -> env.numWorkers.get() == 1);
+ // wait for all transaction runners to be active/started
+ Wait.waitFor(() -> fate.getTotalTxRunnersActive() == poolSize);
+
+ // rename the fate executor
+ // the only reason for changing the pool size here is so that we can
tell that the old fate
+ // executor is shutdown and the new one is started. Changing the pool
size does not by itself
+ // cause a shutdown
+ FateTestUtil.updateFateConfig(config, newPoolSize, "NewPoolName");
+
+ // newPoolSize for the newly created fate executor and 1 for the old
fate executor that has
+ // begun but not finished shutdown (needs to complete the transaction
it's working on)
+ Wait.waitFor(() -> fate.getTotalTxRunnersActive() == newPoolSize + 1);
+
+ // allow work to complete
+ env.isReadyLatch.countDown();
+
+ Wait.waitFor(() -> fate.getTotalTxRunnersActive() == newPoolSize);
+ // at this point, we are certain the old fate executor has completely
shutdown
+ } catch (Throwable e) {
+ // If the finally block throws an exception then this exception will
never be seen so log it
+ // just in case.
+ log.error("Failure in test", e);
+ throw e;
+ } finally {
+ fate.shutdown(30, TimeUnit.SECONDS);
+ assertEquals(0, fate.getTotalTxRunnersActive());
+ }
+ }
+
private ConfigurationCopy initConfigIncTest1() {
- // {<half the FATE ops/SET1>}: 4
- // {<other half/SET2>}: 5
+ // SET1: {<half the FATE ops>: 4}
+ // SET2: {<other half>: 5}
ConfigurationCopy config = new ConfigurationCopy();
config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
- config.set(Property.MANAGER_FATE_USER_CONFIG, "{\""
- +
USER_FATE_OPS_SET1.stream().map(Enum::name).collect(Collectors.joining(",")) +
"\": 4,\""
- +
USER_FATE_OPS_SET2.stream().map(Enum::name).collect(Collectors.joining(",")) +
"\": 5}");
- config.set(Property.MANAGER_FATE_META_CONFIG, "{\""
- +
META_FATE_OPS_SET1.stream().map(Enum::name).collect(Collectors.joining(",")) +
"\": 4,\""
- +
META_FATE_OPS_SET2.stream().map(Enum::name).collect(Collectors.joining(",")) +
"\": 5}");
+ config.set(Property.MANAGER_FATE_USER_CONFIG,
+ String
+ .format("{'SET1':{'%s':%d},'SET2':{'%s':%d}}",
+
USER_FATE_OPS_SET1.stream().map(Enum::name).collect(Collectors.joining(",")), 4,
+
USER_FATE_OPS_SET2.stream().map(Enum::name).collect(Collectors.joining(",")), 5)
+ .replace("'", "\""));
+ config.set(Property.MANAGER_FATE_META_CONFIG,
+ String
+ .format("{'SET1':{'%s':%d},'SET2':{'%s':%d}}",
+
META_FATE_OPS_SET1.stream().map(Enum::name).collect(Collectors.joining(",")), 4,
+
META_FATE_OPS_SET2.stream().map(Enum::name).collect(Collectors.joining(",")), 5)
+ .replace("'", "\""));
config.set(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL, "60m");
return config;
}
private void changeConfigIncTest1(ConfigurationCopy config) {
- // {<half the FATE ops/SET1>}: 10
- // {<other half minus one/SET3>}: 9
- // {<remaining FATE op/SET4>}: 8
- config.set(Property.MANAGER_FATE_USER_CONFIG, "{\""
- +
USER_FATE_OPS_SET1.stream().map(Enum::name).collect(Collectors.joining(",")) +
"\": 10,"
- + "\"" +
USER_FATE_OPS_SET3.stream().map(Enum::name).collect(Collectors.joining(","))
- + "\": 9,\"" +
USER_FATE_OPS_SET4.stream().map(Enum::name).collect(Collectors.joining(","))
- + "\": 8}");
- config.set(Property.MANAGER_FATE_META_CONFIG, "{\""
- +
META_FATE_OPS_SET1.stream().map(Enum::name).collect(Collectors.joining(",")) +
"\": 10,"
- + "\"" +
META_FATE_OPS_SET3.stream().map(Enum::name).collect(Collectors.joining(","))
- + "\": 9,\"" +
META_FATE_OPS_SET4.stream().map(Enum::name).collect(Collectors.joining(","))
- + "\": 8}");
- }
-
- private void changeConfigIncTest2(ConfigurationCopy config, int numThreads) {
- config.set(Property.MANAGER_FATE_USER_CONFIG, "{\"" +
Fate.FateOperation.getAllUserFateOps()
- .stream().map(Enum::name).collect(Collectors.joining(",")) + "\": " +
numThreads + "}");
- config.set(Property.MANAGER_FATE_META_CONFIG, "{\"" +
Fate.FateOperation.getAllMetaFateOps()
- .stream().map(Enum::name).collect(Collectors.joining(",")) + "\": " +
numThreads + "}");
+ // SET1: {<half the FATE ops>: 10}
+ // SET3: {<other half minus one>: 9}
+ // SET4: {<remaining FATE op>: 8}
+ config.set(Property.MANAGER_FATE_USER_CONFIG,
+ String
+ .format("{'SET1':{'%s':%d},'SET3':{'%s':%d},'SET4':{'%s':%d}}",
+
USER_FATE_OPS_SET1.stream().map(Enum::name).collect(Collectors.joining(",")),
10,
+
USER_FATE_OPS_SET3.stream().map(Enum::name).collect(Collectors.joining(",")), 9,
+
USER_FATE_OPS_SET4.stream().map(Enum::name).collect(Collectors.joining(",")), 8)
+ .replace("'", "\""));
+ config.set(Property.MANAGER_FATE_META_CONFIG,
+ String
+ .format("{'SET1':{'%s':%d},'SET3':{'%s':%d},'SET4':{'%s':%d}}",
+
META_FATE_OPS_SET1.stream().map(Enum::name).collect(Collectors.joining(",")),
10,
+
META_FATE_OPS_SET3.stream().map(Enum::name).collect(Collectors.joining(",")), 9,
+
META_FATE_OPS_SET4.stream().map(Enum::name).collect(Collectors.joining(",")), 8)
+ .replace("'", "\""));
}
private ConfigurationCopy initConfigDecTest() {
- // {<half the FATE ops/SET1>}: 4
- // {<other half minus one/SET3>}: 5
- // {<remaining FATE op/SET4>}: 6
+ // SET1: {<half the FATE ops>: 4}
+ // SET3: {<other half minus one>: 5}
+ // SET4: {<remaining FATE op>: 6}
ConfigurationCopy config = new ConfigurationCopy();
config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
- config.set(Property.MANAGER_FATE_USER_CONFIG, "{\""
- +
USER_FATE_OPS_SET1.stream().map(Enum::name).collect(Collectors.joining(",")) +
"\": 4,"
- + "\"" +
USER_FATE_OPS_SET3.stream().map(Enum::name).collect(Collectors.joining(","))
- + "\": 5,\"" +
USER_FATE_OPS_SET4.stream().map(Enum::name).collect(Collectors.joining(","))
- + "\": 6}");
- config.set(Property.MANAGER_FATE_META_CONFIG, "{\""
- +
META_FATE_OPS_SET1.stream().map(Enum::name).collect(Collectors.joining(",")) +
"\": 4,"
- + "\"" +
META_FATE_OPS_SET3.stream().map(Enum::name).collect(Collectors.joining(","))
- + "\": 5,\"" +
META_FATE_OPS_SET4.stream().map(Enum::name).collect(Collectors.joining(","))
- + "\": 6}");
+ config.set(Property.MANAGER_FATE_USER_CONFIG,
+ String
+ .format("{'SET1':{'%s':%d},'SET3':{'%s':%d},'SET4':{'%s':%d}}",
+
USER_FATE_OPS_SET1.stream().map(Enum::name).collect(Collectors.joining(",")), 4,
+
USER_FATE_OPS_SET3.stream().map(Enum::name).collect(Collectors.joining(",")), 5,
+
USER_FATE_OPS_SET4.stream().map(Enum::name).collect(Collectors.joining(",")), 6)
+ .replace("'", "\""));
+ config.set(Property.MANAGER_FATE_META_CONFIG,
+ String
+ .format("{'SET1':{'%s':%d},'SET3':{'%s':%d},'SET4':{'%s':%d}}",
+
META_FATE_OPS_SET1.stream().map(Enum::name).collect(Collectors.joining(",")), 4,
+
META_FATE_OPS_SET3.stream().map(Enum::name).collect(Collectors.joining(",")), 5,
+
META_FATE_OPS_SET4.stream().map(Enum::name).collect(Collectors.joining(",")), 6)
+ .replace("'", "\""));
config.set(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL, "60m");
return config;
}
private void changeConfigDecTest(ConfigurationCopy config) {
- // {<half the FATE ops/SET1>}: 3
- // {<other half/SET2>}: 2
- config.set(Property.MANAGER_FATE_USER_CONFIG, "{\""
- +
USER_FATE_OPS_SET1.stream().map(Enum::name).collect(Collectors.joining(",")) +
"\": 3,\""
- +
USER_FATE_OPS_SET2.stream().map(Enum::name).collect(Collectors.joining(",")) +
"\": 2}");
- config.set(Property.MANAGER_FATE_META_CONFIG, "{\""
- +
META_FATE_OPS_SET1.stream().map(Enum::name).collect(Collectors.joining(",")) +
"\": 3,\""
- +
META_FATE_OPS_SET2.stream().map(Enum::name).collect(Collectors.joining(",")) +
"\": 2}");
- }
-
- private AccumuloConfiguration configIdleHistoryTest() {
- ConfigurationCopy config = new ConfigurationCopy();
- config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
- config.set(Property.MANAGER_FATE_USER_CONFIG, "{\""
- +
ALL_USER_FATE_OPS.stream().map(Enum::name).collect(Collectors.joining(",")) +
"\": 2}");
- config.set(Property.MANAGER_FATE_META_CONFIG, "{\""
- +
ALL_META_FATE_OPS.stream().map(Enum::name).collect(Collectors.joining(",")) +
"\": 2}");
- config.set(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL, "1m");
- return config;
+ // SET1: {<half the FATE ops>: 3}
+ // SET2: {<other half>: 2}
+ config.set(Property.MANAGER_FATE_USER_CONFIG,
+ String
+ .format("{'SET1':{'%s':%d},'SET2':{'%s':%d}}",
+
USER_FATE_OPS_SET1.stream().map(Enum::name).collect(Collectors.joining(",")), 3,
+
USER_FATE_OPS_SET2.stream().map(Enum::name).collect(Collectors.joining(",")), 2)
+ .replace("'", "\""));
+ config.set(Property.MANAGER_FATE_META_CONFIG,
+ String
+ .format("{'SET1':{'%s':%d},'SET2':{'%s':%d}}",
+
META_FATE_OPS_SET1.stream().map(Enum::name).collect(Collectors.joining(",")), 3,
+
META_FATE_OPS_SET2.stream().map(Enum::name).collect(Collectors.joining(",")), 2)
+ .replace("'", "\""));
}
public static class PoolResizeTestRepo implements Repo<PoolResizeTestEnv> {
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateTestUtil.java
b/test/src/main/java/org/apache/accumulo/test/fate/FateTestUtil.java
index 8d6c687f33..4608065bb3 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateTestUtil.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateTestUtil.java
@@ -93,17 +93,25 @@ public class FateTestUtil {
}
/**
- * Returns a config with all FATE operations assigned to a single pool of
size numThreads for both
- * USER and META FATE operations
+ * Returns the config with all FATE operations assigned to a single pool of
size "numThreads" for
+ * both USER and META FATE operations. The fate executor is given the name
"name"
*/
- public static ConfigurationCopy createTestFateConfig(int numThreads) {
- ConfigurationCopy config = new ConfigurationCopy();
+ public static ConfigurationCopy updateFateConfig(ConfigurationCopy config,
int numThreads,
+ String name) {
// this value isn't important, just needs to be set
config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
- config.set(Property.MANAGER_FATE_USER_CONFIG, "{\"" +
Fate.FateOperation.getAllUserFateOps()
- .stream().map(Enum::name).collect(Collectors.joining(",")) + "\": " +
numThreads + "}");
- config.set(Property.MANAGER_FATE_META_CONFIG, "{\"" +
Fate.FateOperation.getAllMetaFateOps()
- .stream().map(Enum::name).collect(Collectors.joining(",")) + "\": " +
numThreads + "}");
+ config
+ .set(Property.MANAGER_FATE_USER_CONFIG,
+ String
+ .format("{'%s':{'%s': %d}}", name,
Fate.FateOperation.getAllUserFateOps().stream()
+ .map(Enum::name).collect(Collectors.joining(",")),
numThreads)
+ .replace("'", "\""));
+ config
+ .set(Property.MANAGER_FATE_META_CONFIG,
+ String
+ .format("{'%s':{'%s': %d}}", name,
Fate.FateOperation.getAllMetaFateOps().stream()
+ .map(Enum::name).collect(Collectors.joining(",")),
numThreads)
+ .replace("'", "\""));
config.set(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL, "60m");
return config;
}
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java
b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java
index f022e83f16..00825815ee 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java
@@ -40,15 +40,15 @@ public class FlakyFate<T> extends Fate<T> {
AccumuloConfiguration conf) {
super(environment, store, false, toLogStrFunc, conf, new
ScheduledThreadPoolExecutor(2));
for (var poolConfig : getPoolConfigurations(conf,
getStore().type()).entrySet()) {
- fateExecutors.add(
- new FlakyFateExecutor<>(this, environment, poolConfig.getKey(),
poolConfig.getValue()));
+ fateExecutors.add(new FlakyFateExecutor<>(this, environment,
poolConfig.getKey(),
+ poolConfig.getValue().getValue(), poolConfig.getValue().getKey()));
}
}
private static class FlakyFateExecutor<T> extends FateExecutor<T> {
- private FlakyFateExecutor(Fate<T> fate, T environment, Set<FateOperation>
fateOps,
- int poolSize) {
- super(fate, environment, fateOps, poolSize);
+ private FlakyFateExecutor(Fate<T> fate, T environment, Set<FateOperation>
fateOps, int poolSize,
+ String name) {
+ super(fate, environment, fateOps, poolSize, name);
}
@Override
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresITBase.java
b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresITBase.java
index ae6d485d05..d2c79855f4 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresITBase.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresITBase.java
@@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.fate.Fate;
import org.apache.accumulo.core.fate.FateId;
@@ -308,7 +309,8 @@ public abstract class MultipleStoresITBase extends
SharedMiniClusterBase {
final ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52);
final Set<ZooUtil.LockID> liveLocks = new HashSet<>();
final Predicate<ZooUtil.LockID> isLockHeld = liveLocks::contains;
- final AccumuloConfiguration config =
FateTestUtil.createTestFateConfig(numThreads);
+ final AccumuloConfiguration config =
+ FateTestUtil.updateFateConfig(new ConfigurationCopy(), numThreads,
"AllFateOps");
Map<FateId,FateStore.FateReservation> reservations;
try (final FateStore<LatchTestEnv> store1 = testStoreFactory.create(lock1,
isLockHeld)) {
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/SlowFateSplit.java
b/test/src/main/java/org/apache/accumulo/test/fate/SlowFateSplit.java
index b413258ce2..65a3783778 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/SlowFateSplit.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/SlowFateSplit.java
@@ -55,15 +55,15 @@ public class SlowFateSplit<T> extends Fate<T> {
AccumuloConfiguration conf) {
super(environment, store, false, toLogStrFunc, conf, new
ScheduledThreadPoolExecutor(2));
for (var poolConfig : getPoolConfigurations(conf,
getStore().type()).entrySet()) {
- fateExecutors.add(
- new SlowFateSplitExecutor(this, environment, poolConfig.getKey(),
poolConfig.getValue()));
+ fateExecutors.add(new SlowFateSplitExecutor(this, environment,
poolConfig.getKey(),
+ poolConfig.getValue().getValue(), poolConfig.getValue().getKey()));
}
}
private class SlowFateSplitExecutor extends FateExecutor<T> {
private SlowFateSplitExecutor(Fate<T> fate, T environment,
Set<Fate.FateOperation> fateOps,
- int poolSize) {
- super(fate, environment, fateOps, poolSize);
+ int poolSize, String name) {
+ super(fate, environment, fateOps, poolSize, name);
}
@Override
diff --git a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
index ff1859c092..6d618f55f9 100644
--- a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
@@ -62,6 +62,8 @@ import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.CompactionConfig;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
@@ -96,6 +98,7 @@ public class MetricsIT extends ConfigurableMacBase implements
MetricsProducer {
private static final int numFateThreadsPool1 = 5;
private static final int numFateThreadsPool2 = 10;
private static final int numFateThreadsPool3 = 15;
+ private static final String allOpsFateExecutorName = "pool1";
@Override
protected Duration defaultTimeout() {
@@ -131,9 +134,8 @@ public class MetricsIT extends ConfigurableMacBase
implements MetricsProducer {
TestStatsDRegistryFactory.SERVER_PORT,
Integer.toString(sink.getPort()));
cfg.setSystemProperties(sysProps);
// custom config for the fate thread pools.
- // starting FATE config for each FATE type (USER and META) will be:
- // {<all fate ops>: numFateThreadsPool1} (one pool for all ops of size
numFateThreadsPool1)
- var fatePoolsConfig =
FateTestUtil.createTestFateConfig(numFateThreadsPool1);
+ var fatePoolsConfig = FateTestUtil.updateFateConfig(new
ConfigurationCopy(),
+ numFateThreadsPool1, allOpsFateExecutorName);
cfg.setProperty(Property.MANAGER_FATE_USER_CONFIG.getKey(),
fatePoolsConfig.get(Property.MANAGER_FATE_USER_CONFIG));
cfg.setProperty(Property.MANAGER_FATE_META_CONFIG.getKey(),
@@ -261,7 +263,7 @@ public class MetricsIT extends ConfigurableMacBase
implements MetricsProducer {
// Tests metrics for Fate's thread pools. Tests that metrics are seen as
expected, and config
// changes to the thread pools are accurately reflected in the metrics.
This includes checking
// that old thread pool metrics are removed, new ones are created, size
changes to thread
- // pools are reflected, and the ops assigned and instance type tags are
seen as expected
+ // pools are reflected, and the tags are seen as expected
final String table = getUniqueNames(1)[0];
// prevent any system initiated fate operations from running, which may
interfere with our
@@ -299,9 +301,11 @@ public class MetricsIT extends ConfigurableMacBase
implements MetricsProducer {
var tags = metric.getTags();
var instanceType = FateInstanceType
.valueOf(tags.get(FateExecutorMetrics.INSTANCE_TYPE_TAG_KEY).toUpperCase());
- var opsAssigned = tags.get(FateExecutorMetrics.OPS_ASSIGNED_TAG_KEY);
+ var poolName = tags.get(FateExecutorMetrics.POOL_NAME_TAG_KEY);
- verifyFateMetricTags(opsAssigned, instanceType);
+ // ensure that the pool name seen in the tag is in the fate
configuration
+
getFateOpsFromConfig(getCluster().getServerContext().getConfiguration(),
instanceType,
+ poolName);
if (metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())
&& numFateThreadsPool1 == Integer.parseInt(metric.getValue())) {
@@ -324,11 +328,8 @@ public class MetricsIT extends ConfigurableMacBase
implements MetricsProducer {
assertTrue(sawExpectedTotalThreadsUserMetric);
assertTrue(sawExpectedTotalThreadsMetaMetric);
- // Now change the config from:
- // {<all fate ops>: numFateThreadsPool1}
- // ->
- // {<all fate ops except split>: numFateThreadsPool2,
- // <split operation>: numFateThreadsPool3}
+ // change config such that existing fate executor (pool1) is shutdown,
and two new fate
+ // executors (pool2 and pool3) are started
changeFateConfig(client, FateInstanceType.USER);
changeFateConfig(client, FateInstanceType.META);
@@ -363,16 +364,16 @@ public class MetricsIT extends ConfigurableMacBase
implements MetricsProducer {
var tags = metric.getTags();
var instanceType = FateInstanceType
.valueOf(tags.get(FateExecutorMetrics.INSTANCE_TYPE_TAG_KEY).toUpperCase());
- var opsAssigned = tags.get(FateExecutorMetrics.OPS_ASSIGNED_TAG_KEY);
+ var poolName = tags.get(FateExecutorMetrics.POOL_NAME_TAG_KEY);
- verifyFateMetricTags(opsAssigned, instanceType);
+ // get the fate operations associated with the pool name seen in the
metrics
+ Set<Fate.FateOperation> fateOps = getFateOpsFromConfig(
+ getCluster().getServerContext().getConfiguration(),
instanceType, poolName);
- Set<Fate.FateOperation> fateOpsFromMetric =
gatherFateOpsFromTag(opsAssigned);
-
- if (fateOpsFromMetric.equals(Fate.FateOperation.getAllUserFateOps())
- ||
fateOpsFromMetric.equals(Fate.FateOperation.getAllMetaFateOps())) {
+ if (fateOps.equals(Fate.FateOperation.getAllUserFateOps())
+ || fateOps.equals(Fate.FateOperation.getAllMetaFateOps())) {
sawAnyMetricPool1 = true;
- } else if
(fateOpsFromMetric.equals(Arrays.stream(Fate.FateOperation.values())
+ } else if (fateOps.equals(Arrays.stream(Fate.FateOperation.values())
.filter(fo ->
!fo.equals(SlowFateSplitManager.SLOW_OP)).collect(Collectors.toSet()))
&& numFateThreadsPool2 == Integer.parseInt(metric.getValue())) {
// pool2
@@ -390,7 +391,7 @@ public class MetricsIT extends ConfigurableMacBase
implements MetricsProducer {
sawExpectedTotalThreadsMetaMetricPool2 = true;
}
}
- } else if
(fateOpsFromMetric.equals(Set.of(SlowFateSplitManager.SLOW_OP))
+ } else if (fateOps.equals(Set.of(SlowFateSplitManager.SLOW_OP))
&& numFateThreadsPool3 == Integer.parseInt(metric.getValue())) {
// pool3
// total = inactive = size pool3
@@ -429,33 +430,31 @@ public class MetricsIT extends ConfigurableMacBase
implements MetricsProducer {
}
/**
- * Verifies what should always be true for fate metrics tags: The ops
assigned tag should include
- * all the ops that are associated with a pool
+ * gets the fate operations from the configuration associated with the given
pool name, throwing a
+ * runtime exception if one does not exist.
*/
- private void verifyFateMetricTags(String opsAssignedInMetric,
FateInstanceType type) {
- var opsAssignedInConfig =
-
Fate.getPoolConfigurations(getCluster().getServerContext().getConfiguration(),
type);
- assertNotNull(opsAssignedInConfig);
-
- var fateOpsFromMetric = gatherFateOpsFromTag(opsAssignedInMetric);
-
- assertNotNull(opsAssignedInConfig.get(fateOpsFromMetric));
+ private Set<Fate.FateOperation> getFateOpsFromConfig(AccumuloConfiguration
config,
+ FateInstanceType type, String poolName) throws RuntimeException {
+ var configForPoolNameSet = Fate.getPoolConfigurations(config,
type).entrySet().stream()
+ .filter(entry ->
poolName.contains(entry.getValue().getKey())).collect(Collectors.toSet());
+ assertEquals(1, configForPoolNameSet.size());
+ return configForPoolNameSet.iterator().next().getKey();
}
private void changeFateConfig(AccumuloClient client, FateInstanceType type)
throws Exception {
- Set<Fate.FateOperation> allFateOps = null;
+ Set<Fate.FateOperation> allOpsMinusSlowOp = null;
if (type == FateInstanceType.USER) {
- allFateOps = new HashSet<>(Fate.FateOperation.getAllUserFateOps());
+ allOpsMinusSlowOp = new
HashSet<>(Fate.FateOperation.getAllUserFateOps());
} else if (type == FateInstanceType.META) {
- allFateOps = new HashSet<>(Fate.FateOperation.getAllMetaFateOps());
+ allOpsMinusSlowOp = new
HashSet<>(Fate.FateOperation.getAllMetaFateOps());
}
- assertNotNull(allFateOps);
- allFateOps.remove(SlowFateSplitManager.SLOW_OP);
- String newFateConfig =
- "{'" +
allFateOps.stream().map(Enum::name).collect(Collectors.joining(",")) + "': "
- + numFateThreadsPool2 + ",'" + SlowFateSplitManager.SLOW_OP.name()
+ "': "
- + numFateThreadsPool3 + "}";
- newFateConfig = newFateConfig.replace("'", "\"");
+ assertNotNull(allOpsMinusSlowOp);
+ allOpsMinusSlowOp.remove(SlowFateSplitManager.SLOW_OP);
+ String newFateConfig = String
+ .format("{'pool2':{'%s': %d},'pool3':{'%s': %d}}",
+
allOpsMinusSlowOp.stream().map(Enum::name).collect(Collectors.joining(",")),
+ numFateThreadsPool2, SlowFateSplitManager.SLOW_OP.name(),
numFateThreadsPool3)
+ .replace("'", "\"");
if (type == FateInstanceType.USER) {
client.instanceOperations().setProperty(Property.MANAGER_FATE_USER_CONFIG.getKey(),
@@ -466,15 +465,6 @@ public class MetricsIT extends ConfigurableMacBase
implements MetricsProducer {
}
}
- private Set<Fate.FateOperation> gatherFateOpsFromTag(String opsAssigned) {
- String[] ops = opsAssigned.split("\\.");
- Set<Fate.FateOperation> fateOpsFromMetric = new HashSet<>();
- for (var op : ops) {
- fateOpsFromMetric.add(Fate.FateOperation.valueOf(op.toUpperCase()));
- }
- return fateOpsFromMetric;
- }
-
static void doWorkToGenerateMetrics(AccumuloClient client, Class<?>
testClass) throws Exception {
String tableName = testClass.getSimpleName();
client.tableOperations().create(tableName);
@@ -553,7 +543,7 @@ public class MetricsIT extends ConfigurableMacBase
implements MetricsProducer {
assertEquals("value2", a.getTags().get("tag2"));
// check the length of the tag value is sane
- final int MAX_EXPECTED_TAG_LEN = 512;
+ final int MAX_EXPECTED_TAG_LEN = 128;
a.getTags().forEach((k, v) -> assertTrue(v.length() <
MAX_EXPECTED_TAG_LEN));
});
}