This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new eba5059 ConcurrentDeleteTableIT bug fix and improvements (#2304) eba5059 is described below commit eba505922225dbdac4f2d9d7c26a5baaf3d851b6 Author: Dom G <47725857+domgargu...@users.noreply.github.com> AuthorDate: Thu Oct 7 10:52:02 2021 -0400 ConcurrentDeleteTableIT bug fix and improvements (#2304) * Catch expected exceptions causing test to be flaky * Add exception message constants * Refactoring of test --- .../core/clientImpl/TableOperationsImpl.java | 4 +- .../manager/tableOps/compact/CompactionDriver.java | 7 +- .../tableOps/compact/CompactionDriverTest.java | 5 +- .../accumulo/test/functional/CompactionIT.java | 5 +- .../test/functional/ConcurrentDeleteTableIT.java | 154 +++++++++++---------- .../org/apache/accumulo/test/util/SlowOps.java | 3 +- 6 files changed, 94 insertions(+), 84 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java index ecab4df..a6f5370 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java @@ -156,6 +156,8 @@ import com.google.common.base.Preconditions; public class TableOperationsImpl extends TableOperationsHelper { public static final String CLONE_EXCLUDE_PREFIX = "!"; + public static final String COMPACTION_CANCELED_MSG = "Compaction canceled"; + public static final String TABLE_DELETED_MSG = "Table is being deleted"; private static final Logger log = LoggerFactory.getLogger(TableOperations.class); private final ClientContext context; @@ -1244,7 +1246,7 @@ public class TableOperationsImpl extends TableOperationsHelper { if (currentState != expectedState) { context.requireNotDeleted(tableId); if (currentState == TableState.DELETING) - throw new TableNotFoundException(tableId.canonical(), "", "Table is being deleted."); + throw new TableNotFoundException(tableId.canonical(), "", TABLE_DELETED_MSG); throw new AccumuloException("Unexpected table state " + tableId + " " + Tables.getTableState(context, tableId) + " != " + expectedState); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java index 24a5f62..b80494b 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java @@ -24,6 +24,7 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; +import org.apache.accumulo.core.clientImpl.TableOperationsImpl; import org.apache.accumulo.core.clientImpl.Tables; import org.apache.accumulo.core.clientImpl.thrift.TableOperation; import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType; @@ -83,7 +84,8 @@ class CompactionDriver extends ManagerRepo { if (Long.parseLong(new String(zoo.getData(zCancelID))) >= compactId) { // compaction was canceled throw new AcceptableThriftTableOperationException(tableId.canonical(), null, - TableOperation.COMPACT, TableOperationExceptionType.OTHER, "Compaction canceled"); + TableOperation.COMPACT, TableOperationExceptionType.OTHER, + TableOperationsImpl.COMPACTION_CANCELED_MSG); } String deleteMarkerPath = @@ -91,7 +93,8 @@ class CompactionDriver extends ManagerRepo { if (zoo.exists(deleteMarkerPath)) { // table is being deleted throw new AcceptableThriftTableOperationException(tableId.canonical(), null, - TableOperation.COMPACT, TableOperationExceptionType.OTHER, "Table is being deleted"); + TableOperation.COMPACT, TableOperationExceptionType.OTHER, + TableOperationsImpl.TABLE_DELETED_MSG); } MapCounter<TServerInstance> serversToFlush = new MapCounter<>(); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java index 1ccfc9f..a4280ef 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.fail; import java.util.UUID; import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; +import org.apache.accumulo.core.clientImpl.TableOperationsImpl; import org.apache.accumulo.core.clientImpl.thrift.TableOperation; import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType; import org.apache.accumulo.core.data.NamespaceId; @@ -66,7 +67,7 @@ public class CompactionDriverTest { } catch (AcceptableThriftTableOperationException e) { if (e.getTableId().equals(tableId.toString()) && e.getOp().equals(TableOperation.COMPACT) && e.getType().equals(TableOperationExceptionType.OTHER) - && e.getDescription().equals("Compaction canceled")) { + && e.getDescription().equals(TableOperationsImpl.COMPACTION_CANCELED_MSG)) { // success } else { fail("Unexpected error thrown: " + e.getMessage()); @@ -110,7 +111,7 @@ public class CompactionDriverTest { } catch (AcceptableThriftTableOperationException e) { if (e.getTableId().equals(tableId.toString()) && e.getOp().equals(TableOperation.COMPACT) && e.getType().equals(TableOperationExceptionType.OTHER) - && e.getDescription().equals("Table is being deleted")) { + && e.getDescription().equals(TableOperationsImpl.TABLE_DELETED_MSG)) { // success } else { fail("Unexpected error thrown: " + e.getMessage()); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java index 36cf786..b0c6e50 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java @@ -50,6 +50,7 @@ import org.apache.accumulo.core.client.admin.PluginConfig; import org.apache.accumulo.core.client.admin.compaction.CompactableFile; import org.apache.accumulo.core.client.admin.compaction.CompactionSelector; import org.apache.accumulo.core.client.admin.compaction.CompressionConfigurer; +import org.apache.accumulo.core.clientImpl.TableOperationsImpl; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; @@ -273,7 +274,7 @@ public class CompactionIT extends AccumuloClusterHarness { t.join(); Exception e = error.get(); assertNotNull(e); - assertEquals("Compaction canceled", e.getMessage()); + assertEquals(TableOperationsImpl.COMPACTION_CANCELED_MSG, e.getMessage()); } } @@ -314,7 +315,7 @@ public class CompactionIT extends AccumuloClusterHarness { t.join(); Exception e = error.get(); assertNotNull(e); - assertEquals("Compaction canceled", e.getMessage()); + assertEquals(TableOperationsImpl.COMPACTION_CANCELED_MSG, e.getMessage()); } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java index f02acbd..ada7cb5 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java @@ -45,6 +45,8 @@ import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.clientImpl.TableOperationsImpl; +import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.harness.AccumuloClusterHarness; @@ -53,6 +55,9 @@ import org.junit.Test; public class ConcurrentDeleteTableIT extends AccumuloClusterHarness { + private final NewTableConfiguration ntc = new NewTableConfiguration().withSplits(createSplits()); + private final int NUM_TABLES = 2; + @Override protected int defaultTimeoutSeconds() { return 7 * 60; @@ -62,47 +67,42 @@ public class ConcurrentDeleteTableIT extends AccumuloClusterHarness { public void testConcurrentDeleteTablesOps() throws Exception { try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { - String[] tables = getUniqueNames(2); + String[] tables = getUniqueNames(NUM_TABLES); - TreeSet<Text> splits = createSplits(); - NewTableConfiguration ntc = new NewTableConfiguration().withSplits(splits); + int numDeleteOps = 20; - ExecutorService es = Executors.newFixedThreadPool(20); + ExecutorService es = Executors.newFixedThreadPool(numDeleteOps); int count = 0; for (final String table : tables) { c.tableOperations().create(table, ntc); writeData(c, table); - if (count == 1) { + // flush last table + if (count == tables.length - 1) { c.tableOperations().flush(table, null, null, true); } count++; - int numDeleteOps = 20; final CountDownLatch cdl = new CountDownLatch(numDeleteOps); List<Future<?>> futures = new ArrayList<>(); for (int i = 0; i < numDeleteOps; i++) { - Future<?> future = es.submit(new Runnable() { - - @Override - public void run() { - try { - cdl.countDown(); - cdl.await(); - c.tableOperations().delete(table); - } catch (TableNotFoundException e) { - // expected - } catch (InterruptedException | AccumuloException | AccumuloSecurityException e) { - throw new RuntimeException(e); - } + futures.add(es.submit(() -> { + try { + cdl.countDown(); + cdl.await(); + c.tableOperations().delete(table); + } catch (TableNotFoundException e) { + // expected + } catch (InterruptedException | AccumuloException | AccumuloSecurityException e) { + throw new RuntimeException(e); } - }); - - futures.add(future); + })); } + assertEquals(numDeleteOps, futures.size()); + for (Future<?> future : futures) { future.get(); } @@ -121,49 +121,11 @@ public class ConcurrentDeleteTableIT extends AccumuloClusterHarness { } } - private TreeSet<Text> createSplits() { - TreeSet<Text> splits = new TreeSet<>(); - - for (int i = 0; i < 1000; i++) { - Text split = new Text(String.format("%09x", i * 100000)); - splits.add(split); - } - return splits; - } - - private abstract static class DelayedTableOp implements Runnable { - private CountDownLatch cdl; - - DelayedTableOp(CountDownLatch cdl) { - this.cdl = cdl; - } - - @Override - public void run() { - try { - cdl.countDown(); - cdl.await(); - Thread.sleep(10); - doTableOp(); - } catch (TableNotFoundException | TableOfflineException e) { - // expected - } catch (RuntimeException e) { - throw e; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - protected abstract void doTableOp() throws Exception; - } - @Test public void testConcurrentFateOpsWithDelete() throws Exception { try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { - String[] tables = getUniqueNames(2); - TreeSet<Text> splits = createSplits(); - NewTableConfiguration ntc = new NewTableConfiguration().withSplits(splits); + String[] tables = getUniqueNames(NUM_TABLES); int numOperations = 8; @@ -173,7 +135,8 @@ public class ConcurrentDeleteTableIT extends AccumuloClusterHarness { for (final String table : tables) { c.tableOperations().create(table, ntc); writeData(c, table); - if (count == 1) { + // flush last table + if (count == tables.length - 1) { c.tableOperations().flush(table, null, null, true); } count++; @@ -183,18 +146,15 @@ public class ConcurrentDeleteTableIT extends AccumuloClusterHarness { List<Future<?>> futures = new ArrayList<>(); - futures.add(es.submit(new Runnable() { - @Override - public void run() { - try { - cdl.countDown(); - cdl.await(); - c.tableOperations().delete(table); - } catch (TableNotFoundException | TableOfflineException e) { - // expected - } catch (InterruptedException | AccumuloException | AccumuloSecurityException e) { - throw new RuntimeException(e); - } + futures.add(es.submit(() -> { + try { + cdl.countDown(); + cdl.await(); + c.tableOperations().delete(table); + } catch (TableNotFoundException | TableOfflineException e) { + // expected + } catch (InterruptedException | AccumuloException | AccumuloSecurityException e) { + throw new RuntimeException(e); } })); @@ -269,12 +229,54 @@ public class ConcurrentDeleteTableIT extends AccumuloClusterHarness { } } + private abstract static class DelayedTableOp implements Runnable { + private final CountDownLatch cdl; + + DelayedTableOp(CountDownLatch cdl) { + this.cdl = cdl; + } + + @Override + public void run() { + try { + cdl.countDown(); + cdl.await(); + Thread.sleep(10); + doTableOp(); + } catch (TableNotFoundException | TableOfflineException e) { + // expected + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + if (e.getCause().getClass().equals(ThriftTableOperationException.class) + && (e.getMessage().equals(TableOperationsImpl.COMPACTION_CANCELED_MSG) + || e.getMessage().equals(TableOperationsImpl.TABLE_DELETED_MSG))) { + // acceptable + } else { + throw new RuntimeException(e); + } + } + } + + protected abstract void doTableOp() throws Exception; + } + + private TreeSet<Text> createSplits() { + TreeSet<Text> splits = new TreeSet<>(); + + for (int i = 0; i < 1_000; i++) { + Text split = new Text(String.format("%09x", i * 100_000)); + splits.add(split); + } + return splits; + } + private void writeData(AccumuloClient c, String table) throws TableNotFoundException, MutationsRejectedException { try (BatchWriter bw = c.createBatchWriter(table)) { Random rand = new SecureRandom(); - for (int i = 0; i < 1000; i++) { - Mutation m = new Mutation(String.format("%09x", rand.nextInt(100000 * 1000))); + for (int i = 0; i < 1_000; i++) { + Mutation m = new Mutation(String.format("%09x", rand.nextInt(100_000 * 1_000))); m.put("m", "order", "" + i); bw.addMutation(m); } diff --git a/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java b/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java index b3178cb..d2abe80 100644 --- a/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java +++ b/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java @@ -36,6 +36,7 @@ import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.ActiveCompaction; +import org.apache.accumulo.core.clientImpl.TableOperationsImpl; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; @@ -180,7 +181,7 @@ public class SlowOps { completed = true; } catch (Throwable ex) { // test cancels compaction on complete, so ignore it as an exception. - if (ex.getMessage().contains("Compaction canceled")) { + if (ex.getMessage().contains(TableOperationsImpl.COMPACTION_CANCELED_MSG)) { return; } log.info("Exception thrown while waiting for compaction - will retry", ex);