This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new c5c6d5b588 Improvements and cleanup to ExternalCompaction ITs and
related code (#3661)
c5c6d5b588 is described below
commit c5c6d5b588b98e1930fd21c3a66bbf2b2fe92f68
Author: Dave Marion <[email protected]>
AuthorDate: Thu Jul 27 08:38:52 2023 -0400
Improvements and cleanup to ExternalCompaction ITs and related code (#3661)
Modified ExternalCompactionITs to remove the startup/shutdown of Compactors
that are defined in the compaction services created by the MAC configuration
callback. MAC now starts these Compactors based on the compaction service
configuration. Made some modifications to ExternalCompactionITs to reduce
the
load on ZK as some calls where being made to find the compaction coordinator
address inside of a loop, and that wasn't necessary. Made a modification to
the
CompactionCoordinator so that it won't try to update tablet metadata for a
table
that has been deleted. Made a modification to the Manager so that it only
advertises
the FATE and CompactionCoordinator Thrift service after the Manager has
started up.
---
.../java/org/apache/accumulo/manager/Manager.java | 11 ++-
.../coordinator/CompactionCoordinator.java | 13 +++-
.../compaction/ExternalCompactionProgressIT.java | 15 +++-
.../compaction/ExternalCompactionTestUtils.java | 50 +++++++------
.../test/compaction/ExternalCompaction_1_IT.java | 34 ++-------
.../test/compaction/ExternalCompaction_2_IT.java | 82 ++++++++++++----------
.../test/compaction/ExternalCompaction_3_IT.java | 36 +++++-----
7 files changed, 118 insertions(+), 123 deletions(-)
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 09c673aef0..aa6ef53d19 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -1362,8 +1362,8 @@ public class Manager extends AbstractServer
String address = sa.address.toString();
UUID uuid = sld.getServerUUID(ThriftService.MANAGER);
ServiceDescriptors descriptors = new ServiceDescriptors();
- for (ThriftService svc : new ThriftService[] {ThriftService.MANAGER,
- ThriftService.COORDINATOR}) {
+ for (ThriftService svc : new ThriftService[] {ThriftService.MANAGER,
ThriftService.COORDINATOR,
+ ThriftService.FATE}) {
descriptors.addService(new ServiceDescriptor(uuid, svc, address,
this.getResourceGroup()));
}
@@ -1593,11 +1593,8 @@ public class Manager extends AbstractServer
UUID zooLockUUID = UUID.randomUUID();
ServiceDescriptors descriptors = new ServiceDescriptors();
- for (ThriftService svc : new ThriftService[] {ThriftService.MANAGER,
- ThriftService.COORDINATOR}) {
- descriptors.addService(
- new ServiceDescriptor(zooLockUUID, svc, managerClientAddress,
this.getResourceGroup()));
- }
+ descriptors.addService(new ServiceDescriptor(zooLockUUID,
ThriftService.MANAGER,
+ managerClientAddress, this.getResourceGroup()));
ServiceLockData sld = new ServiceLockData(descriptors);
while (true) {
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 191f990a17..49a27ef7db 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -56,6 +56,7 @@ import java.util.stream.Collectors;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableDeletedException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode;
@@ -1059,9 +1060,15 @@ public class CompactionCoordinator implements
CompactionCoordinatorService.Iface
try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
compactions.forEach((ecid, extent) -> {
-
tabletsMutator.mutateTablet(extent).requireAbsentOperation().requireCompaction(ecid)
-
.requirePrevEndRow(extent.prevEndRow()).deleteExternalCompaction(ecid)
- .submit(tabletMetadata ->
!tabletMetadata.getExternalCompactions().containsKey(ecid));
+ try {
+ ctx.requireNotDeleted(extent.tableId());
+
tabletsMutator.mutateTablet(extent).requireAbsentOperation().requireCompaction(ecid)
+
.requirePrevEndRow(extent.prevEndRow()).deleteExternalCompaction(ecid)
+ .submit(tabletMetadata ->
!tabletMetadata.getExternalCompactions().containsKey(ecid));
+ } catch (TableDeletedException e) {
+ LOG.warn("Table {} was deleted, unable to update metadata for
compaction failure.",
+ extent.tableId());
+ }
});
tabletsMutator.process().forEach((extent, result) -> {
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
index b1b3883f96..b77ec14a8e 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
@@ -31,6 +31,7 @@ import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.accumulo.core.client.Accumulo;
@@ -38,17 +39,22 @@ import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.compaction.thrift.TCompactionState;
import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
import org.apache.accumulo.core.util.compaction.RunningCompactionInfo;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.test.functional.SlowIterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.net.HostAndPort;
+
/**
* Tests that external compactions report progress from start to finish. To
prevent flaky test
* failures, we only measure progress in quarter segments: STARTED, QUARTER,
HALF, THREE_QUARTERS.
@@ -124,7 +130,14 @@ public class ExternalCompactionProgressIT extends
AccumuloClusterHarness {
* Check running compaction progress.
*/
private void checkRunning() throws TException {
- var ecList = getRunningCompactions(getCluster().getServerContext());
+
+ ServerContext ctx = getCluster().getServerContext();
+ Optional<HostAndPort> coordinatorHost =
ExternalCompactionUtil.findCompactionCoordinator(ctx);
+ if (coordinatorHost.isEmpty()) {
+ throw new TTransportException("Unable to get CompactionCoordinator
address from ZooKeeper");
+ }
+
+ var ecList = getRunningCompactions(ctx, coordinatorHost);
var ecMap = ecList.getCompactions();
if (ecMap != null) {
ecMap.forEach((ecid, ec) -> {
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
index a3be15838b..c9b0108278 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
@@ -233,13 +233,8 @@ public class ExternalCompactionTestUtils {
coreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
}
- public static TExternalCompactionList getRunningCompactions(ClientContext
context)
- throws TException {
- Optional<HostAndPort> coordinatorHost =
- ExternalCompactionUtil.findCompactionCoordinator(context);
- if (coordinatorHost.isEmpty()) {
- throw new TTransportException("Unable to get CompactionCoordinator
address from ZooKeeper");
- }
+ public static TExternalCompactionList getRunningCompactions(ClientContext
context,
+ Optional<HostAndPort> coordinatorHost) throws TException {
CompactionCoordinatorService.Client client =
ThriftUtil.getClient(ThriftClientTypes.COORDINATOR,
coordinatorHost.orElseThrow(), context);
try {
@@ -251,13 +246,8 @@ public class ExternalCompactionTestUtils {
}
}
- private static TExternalCompactionList getCompletedCompactions(ClientContext
context)
- throws Exception {
- Optional<HostAndPort> coordinatorHost =
- ExternalCompactionUtil.findCompactionCoordinator(context);
- if (coordinatorHost.isEmpty()) {
- throw new TTransportException("Unable to get CompactionCoordinator
address from ZooKeeper");
- }
+ private static TExternalCompactionList getCompletedCompactions(ClientContext
context,
+ Optional<HostAndPort> coordinatorHost) throws Exception {
CompactionCoordinatorService.Client client =
ThriftUtil.getClient(ThriftClientTypes.COORDINATOR,
coordinatorHost.orElseThrow(), context);
try {
@@ -309,8 +299,13 @@ public class ExternalCompactionTestUtils {
public static int confirmCompactionRunning(ServerContext ctx,
Set<ExternalCompactionId> ecids)
throws Exception {
int matches = 0;
+ Optional<HostAndPort> coordinatorHost =
ExternalCompactionUtil.findCompactionCoordinator(ctx);
+ if (coordinatorHost.isEmpty()) {
+ throw new TTransportException("Unable to get CompactionCoordinator
address from ZooKeeper");
+ }
while (matches == 0) {
- TExternalCompactionList running =
ExternalCompactionTestUtils.getRunningCompactions(ctx);
+ TExternalCompactionList running =
+ ExternalCompactionTestUtils.getRunningCompactions(ctx,
coordinatorHost);
if (running.getCompactions() != null) {
for (ExternalCompactionId ecid : ecids) {
TExternalCompaction tec =
running.getCompactions().get(ecid.canonical());
@@ -329,21 +324,24 @@ public class ExternalCompactionTestUtils {
public static void confirmCompactionCompleted(ServerContext ctx,
Set<ExternalCompactionId> ecids,
TCompactionState expectedState) throws Exception {
+ Optional<HostAndPort> coordinatorHost =
ExternalCompactionUtil.findCompactionCoordinator(ctx);
+ if (coordinatorHost.isEmpty()) {
+ throw new TTransportException("Unable to get CompactionCoordinator
address from ZooKeeper");
+ }
+
// The running compaction should be removed
- TExternalCompactionList running =
ExternalCompactionTestUtils.getRunningCompactions(ctx);
- while (running.getCompactions() != null) {
- running = ExternalCompactionTestUtils.getRunningCompactions(ctx);
- if (running.getCompactions() == null) {
- UtilWaitThread.sleep(250);
- }
+ TExternalCompactionList running =
+ ExternalCompactionTestUtils.getRunningCompactions(ctx,
coordinatorHost);
+ while (running.getCompactions() != null &&
running.getCompactions().keySet().stream()
+ .anyMatch((e) -> ecids.contains(ExternalCompactionId.of(e)))) {
+ running = ExternalCompactionTestUtils.getRunningCompactions(ctx,
coordinatorHost);
}
// The compaction should be in the completed list with the expected state
- TExternalCompactionList completed =
ExternalCompactionTestUtils.getCompletedCompactions(ctx);
+ TExternalCompactionList completed =
+ ExternalCompactionTestUtils.getCompletedCompactions(ctx,
coordinatorHost);
while (completed.getCompactions() == null) {
- completed = ExternalCompactionTestUtils.getCompletedCompactions(ctx);
- if (completed.getCompactions() == null) {
- UtilWaitThread.sleep(50);
- }
+ UtilWaitThread.sleep(50);
+ completed = ExternalCompactionTestUtils.getCompletedCompactions(ctx,
coordinatorHost);
}
for (ExternalCompactionId e : ecids) {
TExternalCompaction tec = completed.getCompactions().get(e.canonical());
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
index 695c87b897..5085c310b4 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
@@ -22,7 +22,6 @@ import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GR
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP2;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP3;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP4;
-import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP5;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP6;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP8;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.MAX_DATA;
@@ -74,7 +73,6 @@ import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -95,13 +93,6 @@ public class ExternalCompaction_1_IT extends
SharedMiniClusterBase {
startMiniClusterWithConfig(new ExternalCompaction1Config());
}
- @AfterEach
- public void tearDown() throws Exception {
- // The ExternalDoNothingCompactor needs to be restarted between tests
- getCluster().getClusterControl().stop(ServerType.COMPACTOR);
-
getCluster().getConfig().getClusterServerConfiguration().clearCompactorResourceGroups();
- }
-
public static class TestFilter extends Filter {
int modulus = 1;
@@ -159,10 +150,6 @@ public class ExternalCompaction_1_IT extends
SharedMiniClusterBase {
writeData(client, table1);
writeData(client, table2);
-
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP1,
1);
-
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP2,
1);
- getCluster().getClusterControl().start(ServerType.COMPACTOR);
-
compact(client, table1, 2, GROUP1, true);
verify(client, table1, 2);
@@ -186,12 +173,7 @@ public class ExternalCompaction_1_IT extends
SharedMiniClusterBase {
writeData(client, table1);
verify(client, table1, 1);
- // ELASTICITY_TODO the compactors started by mini inspecting the config
were interfering with
- // starting the ExternalDoNothingCompactor, so killed all compactors.
This is not the best way
- // to handle this.
getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
-
-
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP3,
1);
getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1,
ExternalDoNothingCompactor.class);
@@ -224,6 +206,10 @@ public class ExternalCompaction_1_IT extends
SharedMiniClusterBase {
} finally {
// We stopped the TServer and started our own, restart the original
TabletServers
getCluster().getClusterControl().start(ServerType.TABLET_SERVER);
+ // Restart the regular compactors
+ getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
+ getCluster().getClusterControl().start(ServerType.COMPACTOR);
+
}
}
@@ -238,10 +224,6 @@ public class ExternalCompaction_1_IT extends
SharedMiniClusterBase {
writeData(client, table1);
- // ELASTICITY_TODO there is already one compactor started by mini based
on config
-
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP4,
2);
- getCluster().getClusterControl().start(ServerType.COMPACTOR);
-
compact(client, table1, 3, GROUP4, true);
verify(client, table1, 3);
@@ -252,9 +234,6 @@ public class ExternalCompaction_1_IT extends
SharedMiniClusterBase {
public void testConfigurer() throws Exception {
String tableName = this.getUniqueNames(1)[0];
-
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP5,
1);
- getCluster().getClusterControl().start(ServerType.COMPACTOR);
-
try (AccumuloClient client =
Accumulo.newClient().from(getCluster().getClientProperties()).build())
{
@@ -328,8 +307,6 @@ public class ExternalCompaction_1_IT extends
SharedMiniClusterBase {
Accumulo.newClient().from(getCluster().getClientProperties()).build())
{
createTable(client, table1, "cs6");
writeData(client, table1);
-
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP6,
1);
- getCluster().getClusterControl().start(ServerType.COMPACTOR);
compact(client, table1, 2, GROUP6, true);
verify(client, table1, 2);
@@ -368,9 +345,6 @@ public class ExternalCompaction_1_IT extends
SharedMiniClusterBase {
try (final AccumuloClient client =
Accumulo.newClient().from(getCluster().getClientProperties()).build())
{
-
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP8,
1);
- getCluster().getClusterControl().start(ServerType.COMPACTOR);
-
createTable(client, tableName, "cs8");
writeData(client, tableName);
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java
index c692e7fc86..4d1cf1b730 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java
@@ -21,7 +21,6 @@ package org.apache.accumulo.test.compaction;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP1;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP3;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP4;
-import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP5;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.MAX_DATA;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionCompleted;
@@ -31,16 +30,22 @@ import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.ro
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.waitForCompactionStartAndReturnEcids;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.writeData;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Collections;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.compaction.thrift.TCompactionState;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
@@ -51,11 +56,12 @@ import
org.apache.accumulo.harness.MiniClusterConfigurationCallback;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.slf4j.LoggerFactory;
public class ExternalCompaction_2_IT extends SharedMiniClusterBase {
@@ -69,22 +75,14 @@ public class ExternalCompaction_2_IT extends
SharedMiniClusterBase {
@BeforeAll
public static void beforeTests() throws Exception {
startMiniClusterWithConfig(new ExternalCompaction2Config());
- }
-
- @AfterEach
- public void tearDown() throws Exception {
- // The ExternalDoNothingCompactor needs to be restarted between tests
getCluster().getClusterControl().stop(ServerType.COMPACTOR);
-
getCluster().getConfig().getClusterServerConfiguration().clearCompactorResourceGroups();
+ getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1,
+ ExternalDoNothingCompactor.class);
}
@Test
public void testSplitCancelsExternalCompaction() throws Exception {
-
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP1,
1);
- getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1,
- ExternalDoNothingCompactor.class);
-
String table1 = this.getUniqueNames(1)[0];
try (AccumuloClient client =
Accumulo.newClient().from(getCluster().getClientProperties()).build())
{
@@ -133,10 +131,6 @@ public class ExternalCompaction_2_IT extends
SharedMiniClusterBase {
@Test
public void testUserCompactionCancellation() throws Exception {
-
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP3,
1);
- getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1,
- ExternalDoNothingCompactor.class);
-
String table1 = this.getUniqueNames(1)[0];
try (AccumuloClient client =
Accumulo.newClient().from(getCluster().getClientProperties()).build())
{
@@ -170,10 +164,6 @@ public class ExternalCompaction_2_IT extends
SharedMiniClusterBase {
@Test
public void testDeleteTableCancelsUserExternalCompaction() throws Exception {
-
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP4,
1);
- getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1,
- ExternalDoNothingCompactor.class);
-
String table1 = this.getUniqueNames(1)[0];
try (AccumuloClient client =
Accumulo.newClient().from(getCluster().getClientProperties()).build())
{
@@ -202,45 +192,59 @@ public class ExternalCompaction_2_IT extends
SharedMiniClusterBase {
@Test
public void testDeleteTableCancelsExternalCompaction() throws Exception {
-
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP5,
1);
- getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1,
- ExternalDoNothingCompactor.class);
-
String table1 = this.getUniqueNames(1)[0];
try (AccumuloClient client =
Accumulo.newClient().from(getCluster().getClientProperties()).build())
{
createTable(client, table1, "cs5");
+
+ ServerContext ctx = getCluster().getServerContext();
+ TableId tid = ctx.getTableId(table1);
+
// set compaction ratio to 1 so that majc occurs naturally, not user
compaction
// user compaction blocks delete
client.tableOperations().setProperty(table1,
Property.TABLE_MAJC_RATIO.toString(), "1.0");
- // cause multiple rfiles to be created
- writeData(client, table1);
- writeData(client, table1);
- writeData(client, table1);
- writeData(client, table1);
-
- TableId tid = getCluster().getServerContext().getTableId(table1);
+ AtomicReference<Throwable> error = new AtomicReference<>();
+ CountDownLatch latch = new CountDownLatch(1);
+ Runnable r = () -> {
+ try {
+ // cause multiple rfiles to be created
+ latch.countDown();
+ writeData(client, table1);
+ writeData(client, table1);
+ writeData(client, table1);
+ writeData(client, table1);
+ } catch (TableNotFoundException | AccumuloException |
AccumuloSecurityException e) {
+ error.set(e);
+ }
+ };
+ Thread t = new Thread(r);
+ t.start();
+ latch.await();
// Wait for the compaction to start by waiting for 1 external compaction
column
- Set<ExternalCompactionId> ecids = ExternalCompactionTestUtils
-
.waitForCompactionStartAndReturnEcids(getCluster().getServerContext(), tid);
+ Set<ExternalCompactionId> ecids =
+
ExternalCompactionTestUtils.waitForCompactionStartAndReturnEcids(ctx, tid);
// Confirm that this ECID shows up in RUNNING set
- int matches = ExternalCompactionTestUtils
- .confirmCompactionRunning(getCluster().getServerContext(), ecids);
+ int matches = ExternalCompactionTestUtils.confirmCompactionRunning(ctx,
ecids);
assertTrue(matches > 0);
client.tableOperations().delete(table1);
- confirmCompactionCompleted(getCluster().getServerContext(), ecids,
- TCompactionState.CANCELLED);
+ LoggerFactory.getLogger(getClass()).debug("Table deleted.");
+
+ confirmCompactionCompleted(ctx, ecids, TCompactionState.CANCELLED);
+
+ LoggerFactory.getLogger(getClass()).debug("Confirmed compaction
cancelled.");
- TabletsMetadata tm =
getCluster().getServerContext().getAmple().readTablets().forTable(tid)
- .fetch(ColumnType.ECOMP).build();
+ TabletsMetadata tm =
+
ctx.getAmple().readTablets().forTable(tid).fetch(ColumnType.PREV_ROW).build();
assertEquals(0, tm.stream().count());
tm.close();
+ t.join();
+ assertNull(error.get());
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java
index d3de651332..52da17e009 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java
@@ -18,7 +18,6 @@
*/
package org.apache.accumulo.test.compaction;
-import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP1;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP2;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionCompleted;
@@ -32,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -47,17 +47,22 @@ import
org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.thrift.TException;
-import org.junit.jupiter.api.AfterEach;
+import org.apache.thrift.transport.TTransportException;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import com.google.common.net.HostAndPort;
+
public class ExternalCompaction_3_IT extends SharedMiniClusterBase {
public static class ExternalCompaction3Config implements
MiniClusterConfigurationCallback {
@@ -70,22 +75,15 @@ public class ExternalCompaction_3_IT extends
SharedMiniClusterBase {
@BeforeAll
public static void beforeTests() throws Exception {
startMiniClusterWithConfig(new ExternalCompaction3Config());
- }
-
- @AfterEach
- public void tearDown() throws Exception {
- // The ExternalDoNothingCompactor needs to be restarted between tests
getCluster().getClusterControl().stop(ServerType.COMPACTOR);
-
getCluster().getConfig().getClusterServerConfiguration().clearCompactorResourceGroups();
+ getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1,
+ ExternalDoNothingCompactor.class);
}
@Test
+ @Disabled // ELASTICITY_TODO: Merges are broken currently
public void testMergeCancelsExternalCompaction() throws Exception {
-
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP1,
1);
- getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1,
- ExternalDoNothingCompactor.class);
-
String table1 = this.getUniqueNames(1)[0];
try (AccumuloClient client =
Accumulo.newClient().from(getCluster().getClientProperties()).build())
{
@@ -143,10 +141,6 @@ public class ExternalCompaction_3_IT extends
SharedMiniClusterBase {
@Test
public void testCoordinatorRestartsDuringCompaction() throws Exception {
-
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP2,
1);
- getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1,
- ExternalDoNothingCompactor.class);
-
String table1 = this.getUniqueNames(1)[0];
try (AccumuloClient client =
Accumulo.newClient().from(getCluster().getClientProperties()).build())
{
@@ -167,13 +161,21 @@ public class ExternalCompaction_3_IT extends
SharedMiniClusterBase {
// Restart the Manager while the compaction is running
getCluster().getClusterControl().start(ServerType.MANAGER);
+ ServerContext ctx = getCluster().getServerContext();
+
// Confirm compaction is still running
int matches = 0;
while (matches == 0) {
TExternalCompactionList running = null;
while (running == null) {
try {
- running = getRunningCompactions(getCluster().getServerContext());
+ Optional<HostAndPort> coordinatorHost =
+ ExternalCompactionUtil.findCompactionCoordinator(ctx);
+ if (coordinatorHost.isEmpty()) {
+ throw new TTransportException(
+ "Unable to get CompactionCoordinator address from
ZooKeeper");
+ }
+ running = getRunningCompactions(ctx, coordinatorHost);
} catch (TException t) {
running = null;
Thread.sleep(2000);