This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new 35f2cf5dc4 FateOpsCommandsIT enhancements (#4400)
35f2cf5dc4 is described below
commit 35f2cf5dc4ddce4b13fdbc4bf83b9d7cd1a0073e
Author: Kevin Rathbun <[email protected]>
AuthorDate: Tue Jun 4 11:27:45 2024 -0400
FateOpsCommandsIT enhancements (#4400)
- Added test to verify name and step of transactions using summary and
print commands
- Added check to filter by non-existent FateId in summary and print
command tests
- Some other small misc improvemements
* Misc Changes:
- Changed result.contains("0 transactions") to result.contains(" 0
transactions") to avoid returning true if the number of transactions is not 0
but ends in 0
- Added shutting down the compactor to BeforeEach (this also has the effect
of adding this shutdown to testTransactionNameAndStep() which I mistakenly
thought shouldn't shutdown the compactor)
- Simplified testTransactionNameAndStep() (unneccessary config and checks).
Also added deleting the table at the end of the test.
---
.../server/util/fateCommand/FateTxnDetails.java | 27 ++-
.../accumulo/test/fate/FateOpsCommandsIT.java | 244 +++++++++++++++------
2 files changed, 201 insertions(+), 70 deletions(-)
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateTxnDetails.java
b/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateTxnDetails.java
index 66c12f81cd..0245b07c4d 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateTxnDetails.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateTxnDetails.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.server.util.fateCommand;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -74,8 +75,10 @@ public class FateTxnDetails implements
Comparable<FateTxnDetails> {
if (txnStatus.getFateId() != null) {
fateId = txnStatus.getFateId().canonical();
}
- locksHeld = formatLockInfo(txnStatus.getHeldLocks(), idsToNameMap);
- locksWaiting = formatLockInfo(txnStatus.getWaitingLocks(), idsToNameMap);
+ locksHeld =
+ Collections.unmodifiableList(formatLockInfo(txnStatus.getHeldLocks(),
idsToNameMap));
+ locksWaiting =
+
Collections.unmodifiableList(formatLockInfo(txnStatus.getWaitingLocks(),
idsToNameMap));
}
private List<String> formatLockInfo(final List<String> lockInfo,
@@ -92,6 +95,18 @@ public class FateTxnDetails implements
Comparable<FateTxnDetails> {
return formattedLocks;
}
+ public long getRunning() {
+ return running;
+ }
+
+ public String getTxName() {
+ return txName;
+ }
+
+ public String getStep() {
+ return step;
+ }
+
public String getFateId() {
return fateId;
}
@@ -100,6 +115,14 @@ public class FateTxnDetails implements
Comparable<FateTxnDetails> {
return status;
}
+ public List<String> getLocksHeld() {
+ return locksHeld;
+ }
+
+ public List<String> getLocksWaiting() {
+ return locksWaiting;
+ }
+
/**
* Sort by running time in reverse (oldest txn first). txid is unique as
used to break times and
* so that compareTo remains consistent with hashCode and equals methods.
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java
index 5bddd4f35c..f448149d76 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java
@@ -28,12 +28,20 @@ import static org.junit.jupiter.api.Assertions.fail;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.fate.Fate;
@@ -41,15 +49,20 @@ import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.FateStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore;
+import org.apache.accumulo.core.iterators.IteratorUtil;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.util.Admin;
import org.apache.accumulo.server.util.fateCommand.FateSummaryReport;
+import org.apache.accumulo.server.util.fateCommand.FateTxnDetails;
import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.accumulo.test.functional.ReadWriteIT;
+import org.apache.accumulo.test.functional.SlowIterator;
import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public abstract class FateOpsCommandsIT extends ConfigurableMacBase
@@ -66,6 +79,15 @@ public abstract class FateOpsCommandsIT extends
ConfigurableMacBase
cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT.getKey(), "10s");
}
+ @BeforeEach
+ public void shutdownCompactor() throws Exception {
+ // Occasionally, the summary/print cmds will see a COMMIT_COMPACTION
transaction which was
+ // initiated on starting the manager, causing the test to fail. Stopping
the compactor fixes
+ // this issue.
+ getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
+ Wait.waitFor(() ->
getCompactorAddrs(getCluster().getServerContext()).isEmpty(), 60_000);
+ }
+
@Test
public void testFateSummaryCommand() throws Exception {
executeTest(this::testFateSummaryCommand);
@@ -75,15 +97,9 @@ public abstract class FateOpsCommandsIT extends
ConfigurableMacBase
throws Exception {
// Configure Fate
Fate<TestEnv> fate = initializeFate(store);
- // Occasionally, the summary/print cmds will see a COMMIT_COMPACTION
transaction which was
- // initiated on starting the manager, causing the test to fail. Stopping
the compactor fixes
- // this issue.
- getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
- Wait.waitFor(() ->
getCompactorAddrs(getCluster().getServerContext()).isEmpty(), 60_000);
// validate blank report, no transactions have started
- ProcessInfo p = getCluster().exec(Admin.class, "fate", "--summary", "-j",
"-s", "NEW", "-s",
- "IN_PROGRESS", "-s", "FAILED");
+ ProcessInfo p = getCluster().exec(Admin.class, "fate", "--summary", "-j");
assertEquals(0, p.getProcess().waitFor());
String result = p.readStdOut();
result = result.substring(result.indexOf("{"), result.lastIndexOf("}") +
1);
@@ -93,10 +109,10 @@ public abstract class FateOpsCommandsIT extends
ConfigurableMacBase
assertTrue(report.getStatusCounts().isEmpty());
assertTrue(report.getStepCounts().isEmpty());
assertTrue(report.getCmdCounts().isEmpty());
- assertEquals(Set.of("FAILED", "IN_PROGRESS", "NEW"),
report.getStatusFilterNames());
+ assertTrue(report.getStatusFilterNames().isEmpty());
assertTrue(report.getInstanceTypesFilterNames().isEmpty());
assertTrue(report.getFateIdFilter().isEmpty());
- assertEquals(0, report.getFateDetails().size());
+ validateFateDetails(report.getFateDetails(), 0, null);
// create Fate transactions
FateId fateId1 = fate.startTransaction();
@@ -117,12 +133,11 @@ public abstract class FateOpsCommandsIT extends
ConfigurableMacBase
assertTrue(report.getStatusFilterNames().isEmpty());
assertTrue(report.getInstanceTypesFilterNames().isEmpty());
assertTrue(report.getFateIdFilter().isEmpty());
- assertEquals(2, report.getFateDetails().size());
- ArrayList<String> fateIdsFromResult1 = new ArrayList<>();
- report.getFateDetails().forEach((d) -> {
- fateIdsFromResult1.add(d.getFateId());
- });
- assertTrue(fateIdsFromResult1.containsAll(fateIdsStarted));
+ validateFateDetails(report.getFateDetails(), 2, fateIdsStarted);
+
+ /*
+ * Test filtering by FateIds
+ */
// validate filtering by both transactions
p = getCluster().exec(Admin.class, "fate", fateId1.canonical(),
fateId2.canonical(),
@@ -140,12 +155,7 @@ public abstract class FateOpsCommandsIT extends
ConfigurableMacBase
assertTrue(report.getInstanceTypesFilterNames().isEmpty());
assertEquals(2, report.getFateIdFilter().size());
assertTrue(report.getFateIdFilter().containsAll(fateIdsStarted));
- assertEquals(2, report.getFateDetails().size());
- ArrayList<String> fateIdsFromResult2 = new ArrayList<>();
- report.getFateDetails().forEach((d) -> {
- fateIdsFromResult2.add(d.getFateId());
- });
- assertTrue(fateIdsFromResult2.containsAll(fateIdsStarted));
+ validateFateDetails(report.getFateDetails(), 2, fateIdsStarted);
// validate filtering by just one transaction
p = getCluster().exec(Admin.class, "fate", fateId1.canonical(),
"--summary", "-j");
@@ -162,12 +172,29 @@ public abstract class FateOpsCommandsIT extends
ConfigurableMacBase
assertTrue(report.getInstanceTypesFilterNames().isEmpty());
assertEquals(1, report.getFateIdFilter().size());
assertTrue(report.getFateIdFilter().contains(fateId1.canonical()));
- assertEquals(1, report.getFateDetails().size());
- ArrayList<String> fateIdsFromResult3 = new ArrayList<>();
- report.getFateDetails().forEach((d) -> {
- fateIdsFromResult3.add(d.getFateId());
- });
- assertTrue(fateIdsFromResult3.contains(fateId1.canonical()));
+ validateFateDetails(report.getFateDetails(), 1, fateIdsStarted);
+
+ // validate filtering by non-existent transaction
+ FateId fakeFateId = FateId.from(store.type(), UUID.randomUUID());
+ p = getCluster().exec(Admin.class, "fate", fakeFateId.canonical(),
"--summary", "-j");
+ assertEquals(0, p.getProcess().waitFor());
+ result = p.readStdOut();
+ result = result.substring(result.indexOf("{"), result.lastIndexOf("}") +
1);
+ report = FateSummaryReport.fromJson(result);
+ assertNotNull(report);
+ assertNotEquals(0, report.getReportTime());
+ assertFalse(report.getStatusCounts().isEmpty());
+ assertFalse(report.getStepCounts().isEmpty());
+ assertFalse(report.getCmdCounts().isEmpty());
+ assertTrue(report.getStatusFilterNames().isEmpty());
+ assertTrue(report.getInstanceTypesFilterNames().isEmpty());
+ assertEquals(1, report.getFateIdFilter().size());
+ assertTrue(report.getFateIdFilter().contains(fakeFateId.canonical()));
+ validateFateDetails(report.getFateDetails(), 0, fateIdsStarted);
+
+ /*
+ * Test filtering by States
+ */
// validate status filter by including only FAILED transactions, should be
none
p = getCluster().exec(Admin.class, "fate", "--summary", "-j", "-s",
"FAILED");
@@ -183,7 +210,27 @@ public abstract class FateOpsCommandsIT extends
ConfigurableMacBase
assertEquals(Set.of("FAILED"), report.getStatusFilterNames());
assertTrue(report.getInstanceTypesFilterNames().isEmpty());
assertTrue(report.getFateIdFilter().isEmpty());
- assertEquals(0, report.getFateDetails().size());
+ validateFateDetails(report.getFateDetails(), 0, fateIdsStarted);
+
+ // validate status filter by including only NEW transactions, should be 2
+ p = getCluster().exec(Admin.class, "fate", "--summary", "-j", "-s", "NEW");
+ assertEquals(0, p.getProcess().waitFor());
+ result = p.readStdOut();
+ result = result.substring(result.indexOf("{"), result.lastIndexOf("}") +
1);
+ report = FateSummaryReport.fromJson(result);
+ assertNotNull(report);
+ assertNotEquals(0, report.getReportTime());
+ assertFalse(report.getStatusCounts().isEmpty());
+ assertFalse(report.getStepCounts().isEmpty());
+ assertFalse(report.getCmdCounts().isEmpty());
+ assertEquals(Set.of("NEW"), report.getStatusFilterNames());
+ assertTrue(report.getInstanceTypesFilterNames().isEmpty());
+ assertTrue(report.getFateIdFilter().isEmpty());
+ validateFateDetails(report.getFateDetails(), 2, fateIdsStarted);
+
+ /*
+ * Test filtering by FateInstanceType
+ */
// validate FateInstanceType filter by only including transactions with
META filter
p = getCluster().exec(Admin.class, "fate", "--summary", "-j", "-t",
"META");
@@ -200,14 +247,9 @@ public abstract class FateOpsCommandsIT extends
ConfigurableMacBase
assertEquals(Set.of("META"), report.getInstanceTypesFilterNames());
assertTrue(report.getFateIdFilter().isEmpty());
if (store.type() == FateInstanceType.META) {
- assertEquals(2, report.getFateDetails().size());
- ArrayList<String> fateIdsFromResult4 = new ArrayList<>();
- report.getFateDetails().forEach((d) -> {
- fateIdsFromResult4.add(d.getFateId());
- });
- assertTrue(fateIdsFromResult4.containsAll(fateIdsStarted));
+ validateFateDetails(report.getFateDetails(), 2, fateIdsStarted);
} else { // USER
- assertEquals(0, report.getFateDetails().size());
+ validateFateDetails(report.getFateDetails(), 0, fateIdsStarted);
}
// validate FateInstanceType filter by only including transactions with
USER filter
@@ -225,14 +267,9 @@ public abstract class FateOpsCommandsIT extends
ConfigurableMacBase
assertEquals(Set.of("USER"), report.getInstanceTypesFilterNames());
assertTrue(report.getFateIdFilter().isEmpty());
if (store.type() == FateInstanceType.META) {
- assertEquals(0, report.getFateDetails().size());
+ validateFateDetails(report.getFateDetails(), 0, fateIdsStarted);
} else { // USER
- assertEquals(2, report.getFateDetails().size());
- ArrayList<String> fateIdsFromResult4 = new ArrayList<>();
- report.getFateDetails().forEach((d) -> {
- fateIdsFromResult4.add(d.getFateId());
- });
- assertTrue(fateIdsFromResult4.containsAll(fateIdsStarted));
+ validateFateDetails(report.getFateDetails(), 2, fateIdsStarted);
}
fate.shutdown(10, TimeUnit.MINUTES);
@@ -247,11 +284,6 @@ public abstract class FateOpsCommandsIT extends
ConfigurableMacBase
throws Exception {
// Configure Fate
Fate<TestEnv> fate = initializeFate(store);
- // Occasionally, the summary/print cmds will see a COMMIT_COMPACTION
transaction which was
- // initiated on starting the manager, causing the test to fail. Stopping
the compactor fixes
- // this issue.
- getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
- Wait.waitFor(() ->
getCompactorAddrs(getCluster().getServerContext()).isEmpty(), 60_000);
// Start some transactions
FateId fateId1 = fate.startTransaction();
@@ -280,17 +312,12 @@ public abstract class FateOpsCommandsIT extends
ConfigurableMacBase
throws Exception {
// Configure Fate
Fate<TestEnv> fate = initializeFate(store);
- // Occasionally, the summary/print cmds will see a COMMIT_COMPACTION
transaction which was
- // initiated on starting the manager, causing the test to fail. Stopping
the compactor fixes
- // this issue.
- getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
- Wait.waitFor(() ->
getCompactorAddrs(getCluster().getServerContext()).isEmpty(), 60_000);
// validate no transactions
ProcessInfo p = getCluster().exec(Admin.class, "fate", "--print");
assertEquals(0, p.getProcess().waitFor());
String result = p.readStdOut();
- assertTrue(result.contains("0 transactions"));
+ assertTrue(result.contains(" 0 transactions"));
// create Fate transactions
FateId fateId1 = fate.startTransaction();
@@ -339,6 +366,14 @@ public abstract class FateOpsCommandsIT extends
ConfigurableMacBase
fateIdsFromResult = getFateIdsFromPrint(result);
assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(),
"NEW"), fateIdsFromResult);
+ // Filter by non-existent FateId
+ FateId fakeFateId = FateId.from(store.type(), UUID.randomUUID());
+ p = getCluster().exec(Admin.class, "fate", fakeFateId.canonical(),
"--print");
+ assertEquals(0, p.getProcess().waitFor());
+ result = p.readStdOut();
+ fateIdsFromResult = getFateIdsFromPrint(result);
+ assertEquals(0, fateIdsFromResult.size());
+
/*
* Test filtering by FateInstanceType
*/
@@ -370,6 +405,73 @@ public abstract class FateOpsCommandsIT extends
ConfigurableMacBase
fate.shutdown(10, TimeUnit.MINUTES);
}
+ @Test
+ public void testTransactionNameAndStep() throws Exception {
+ executeTest(this::testTransactionNameAndStep);
+ }
+
+ protected void testTransactionNameAndStep(FateStore<TestEnv> store,
ServerContext sctx)
+ throws Exception {
+ // Since the other tests just use NEW transactions for simplicity, there
are some fields of the
+ // summary and print outputs which are null and not tested for
(transaction name and transaction
+ // step). This test uses seeded/in progress transactions to test that the
summary and print
+ // commands properly output these fields.
+ try (AccumuloClient client =
Accumulo.newClient().from(getClientProperties()).build()) {
+ final String table = getUniqueNames(1)[0];
+
+ IteratorSetting is = new IteratorSetting(1, SlowIterator.class);
+ is.addOption("sleepTime", "10000");
+
+ NewTableConfiguration cfg = new NewTableConfiguration();
+ cfg.attachIterator(is, EnumSet.of(IteratorUtil.IteratorScope.majc));
+ client.tableOperations().create(table, cfg);
+
+ ReadWriteIT.ingest(client, 10, 10, 10, 0, table);
+ client.tableOperations().flush(table, null, null, true);
+
+ // create 2 Fate transactions
+ client.tableOperations().compact(table, null, null, false, false);
+ client.tableOperations().compact(table, null, null, false, false);
+ List<String> fateIdsStarted = new ArrayList<>();
+
+ ProcessInfo p = getCluster().exec(Admin.class, "fate", "--summary",
"-j");
+ assertEquals(0, p.getProcess().waitFor());
+
+ String result = p.readStdOut();
+ result = result.substring(result.indexOf("{"), result.lastIndexOf("}") +
1);
+ FateSummaryReport report = FateSummaryReport.fromJson(result);
+
+ // Validate transaction name and transaction step from summary command
+
+ for (FateTxnDetails d : report.getFateDetails()) {
+ assertEquals("TABLE_COMPACT", d.getTxName());
+ assertEquals("CompactionDriver", d.getStep());
+ fateIdsStarted.add(d.getFateId());
+ }
+ assertEquals(2, fateIdsStarted.size());
+
+ p = getCluster().exec(Admin.class, "fate", "--print");
+ assertEquals(0, p.getProcess().waitFor());
+ result = p.readStdOut();
+
+ // Validate transaction name and transaction step from print command
+
+ String[] lines = result.split("\n");
+ // Filter out the result to just include the info about the transactions
+ List<String> transactionInfo = Arrays.stream(lines)
+ .filter(
+ line -> line.contains(fateIdsStarted.get(0)) ||
line.contains(fateIdsStarted.get(1)))
+ .collect(Collectors.toList());
+ assertEquals(2, transactionInfo.size());
+ for (String info : transactionInfo) {
+ assertTrue(info.contains("TABLE_COMPACT"));
+ assertTrue(info.contains("op: CompactionDriver"));
+ }
+
+ client.tableOperations().delete(table);
+ }
+ }
+
@Test
public void testFateCancelCommand() throws Exception {
executeTest(this::testFateCancelCommand);
@@ -379,11 +481,6 @@ public abstract class FateOpsCommandsIT extends
ConfigurableMacBase
throws Exception {
// Configure Fate
Fate<TestEnv> fate = initializeFate(store);
- // Occasionally, the summary/print cmds will see a COMMIT_COMPACTION
transaction which was
- // initiated on starting the manager, causing the test to fail. Stopping
the compactor fixes
- // this issue.
- getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
- Wait.waitFor(() ->
getCompactorAddrs(getCluster().getServerContext()).isEmpty(), 60_000);
// Start some transactions
FateId fateId1 = fate.startTransaction();
@@ -417,11 +514,6 @@ public abstract class FateOpsCommandsIT extends
ConfigurableMacBase
throws Exception {
// Configure Fate
Fate<TestEnv> fate = initializeFate(store);
- // Occasionally, the summary/print cmds will see a COMMIT_COMPACTION
transaction which was
- // initiated on starting the manager, causing the test to fail. Stopping
the compactor fixes
- // this issue.
- getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
- Wait.waitFor(() ->
getCompactorAddrs(getCluster().getServerContext()).isEmpty(), 60_000);
// Start some transactions
FateId fateId1 = fate.startTransaction();
@@ -467,11 +559,6 @@ public abstract class FateOpsCommandsIT extends
ConfigurableMacBase
throws Exception {
// Configure Fate
Fate<TestEnv> fate = initializeFate(store);
- // Occasionally, the summary/print cmds will see a COMMIT_COMPACTION
transaction which was
- // initiated on starting the manager, causing the test to fail. Stopping
the compactor fixes
- // this issue.
- getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
- Wait.waitFor(() ->
getCompactorAddrs(getCluster().getServerContext()).isEmpty(), 60_000);
// Start some transactions
FateId fateId1 = fate.startTransaction();
@@ -548,6 +635,27 @@ public abstract class FateOpsCommandsIT extends
ConfigurableMacBase
return fateIdToStatus;
}
+ /**
+ * Validates the fate details of NEW transactions
+ *
+ * @param details the fate details from the {@link FateSummaryReport}
+ * @param expDetailsSize the expected size of details
+ * @param fateIdsStarted the list of fate ids that have been started
+ */
+ private void validateFateDetails(Set<FateTxnDetails> details, int
expDetailsSize,
+ List<String> fateIdsStarted) {
+ assertEquals(expDetailsSize, details.size());
+ for (FateTxnDetails d : details) {
+ assertTrue(fateIdsStarted.contains(d.getFateId()));
+ assertEquals("NEW", d.getStatus());
+ assertEquals("?", d.getStep());
+ assertEquals("?", d.getTxName());
+ assertNotEquals(0, d.getRunning());
+ assertEquals("[]", d.getLocksHeld().toString());
+ assertEquals("[]", d.getLocksWaiting().toString());
+ }
+ }
+
private Fate<TestEnv> initializeFate(FateStore<TestEnv> store) {
ConfigurationCopy config = new ConfigurationCopy();
config.set(Property.GENERAL_THREADPOOL_SIZE, "2");