This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new eba5059  ConcurrentDeleteTableIT bug fix and improvements (#2304)
eba5059 is described below

commit eba505922225dbdac4f2d9d7c26a5baaf3d851b6
Author: Dom G <47725857+domgargu...@users.noreply.github.com>
AuthorDate: Thu Oct 7 10:52:02 2021 -0400

    ConcurrentDeleteTableIT bug fix and improvements (#2304)
    
    * Catch expected exceptions causing test to be flaky
    * Add exception message constants
    * Refactoring of test
---
 .../core/clientImpl/TableOperationsImpl.java       |   4 +-
 .../manager/tableOps/compact/CompactionDriver.java |   7 +-
 .../tableOps/compact/CompactionDriverTest.java     |   5 +-
 .../accumulo/test/functional/CompactionIT.java     |   5 +-
 .../test/functional/ConcurrentDeleteTableIT.java   | 154 +++++++++++----------
 .../org/apache/accumulo/test/util/SlowOps.java     |   3 +-
 6 files changed, 94 insertions(+), 84 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
index ecab4df..a6f5370 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
@@ -156,6 +156,8 @@ import com.google.common.base.Preconditions;
 public class TableOperationsImpl extends TableOperationsHelper {
 
   public static final String CLONE_EXCLUDE_PREFIX = "!";
+  public static final String COMPACTION_CANCELED_MSG = "Compaction canceled";
+  public static final String TABLE_DELETED_MSG = "Table is being deleted";
 
   private static final Logger log = 
LoggerFactory.getLogger(TableOperations.class);
   private final ClientContext context;
@@ -1244,7 +1246,7 @@ public class TableOperationsImpl extends 
TableOperationsHelper {
         if (currentState != expectedState) {
           context.requireNotDeleted(tableId);
           if (currentState == TableState.DELETING)
-            throw new TableNotFoundException(tableId.canonical(), "", "Table 
is being deleted.");
+            throw new TableNotFoundException(tableId.canonical(), "", 
TABLE_DELETED_MSG);
           throw new AccumuloException("Unexpected table state " + tableId + " "
               + Tables.getTableState(context, tableId) + " != " + 
expectedState);
         }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
index 24a5f62..b80494b 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
@@ -24,6 +24,7 @@ import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType
 
 import org.apache.accumulo.core.Constants;
 import 
org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
+import org.apache.accumulo.core.clientImpl.TableOperationsImpl;
 import org.apache.accumulo.core.clientImpl.Tables;
 import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
 import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
@@ -83,7 +84,8 @@ class CompactionDriver extends ManagerRepo {
     if (Long.parseLong(new String(zoo.getData(zCancelID))) >= compactId) {
       // compaction was canceled
       throw new AcceptableThriftTableOperationException(tableId.canonical(), 
null,
-          TableOperation.COMPACT, TableOperationExceptionType.OTHER, 
"Compaction canceled");
+          TableOperation.COMPACT, TableOperationExceptionType.OTHER,
+          TableOperationsImpl.COMPACTION_CANCELED_MSG);
     }
 
     String deleteMarkerPath =
@@ -91,7 +93,8 @@ class CompactionDriver extends ManagerRepo {
     if (zoo.exists(deleteMarkerPath)) {
       // table is being deleted
       throw new AcceptableThriftTableOperationException(tableId.canonical(), 
null,
-          TableOperation.COMPACT, TableOperationExceptionType.OTHER, "Table is 
being deleted");
+          TableOperation.COMPACT, TableOperationExceptionType.OTHER,
+          TableOperationsImpl.TABLE_DELETED_MSG);
     }
 
     MapCounter<TServerInstance> serversToFlush = new MapCounter<>();
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java
index 1ccfc9f..a4280ef 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.fail;
 import java.util.UUID;
 
 import 
org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
+import org.apache.accumulo.core.clientImpl.TableOperationsImpl;
 import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
 import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
 import org.apache.accumulo.core.data.NamespaceId;
@@ -66,7 +67,7 @@ public class CompactionDriverTest {
     } catch (AcceptableThriftTableOperationException e) {
       if (e.getTableId().equals(tableId.toString()) && 
e.getOp().equals(TableOperation.COMPACT)
           && e.getType().equals(TableOperationExceptionType.OTHER)
-          && e.getDescription().equals("Compaction canceled")) {
+          && 
e.getDescription().equals(TableOperationsImpl.COMPACTION_CANCELED_MSG)) {
         // success
       } else {
         fail("Unexpected error thrown: " + e.getMessage());
@@ -110,7 +111,7 @@ public class CompactionDriverTest {
     } catch (AcceptableThriftTableOperationException e) {
       if (e.getTableId().equals(tableId.toString()) && 
e.getOp().equals(TableOperation.COMPACT)
           && e.getType().equals(TableOperationExceptionType.OTHER)
-          && e.getDescription().equals("Table is being deleted")) {
+          && e.getDescription().equals(TableOperationsImpl.TABLE_DELETED_MSG)) 
{
         // success
       } else {
         fail("Unexpected error thrown: " + e.getMessage());
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
index 36cf786..b0c6e50 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
@@ -50,6 +50,7 @@ import org.apache.accumulo.core.client.admin.PluginConfig;
 import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
 import org.apache.accumulo.core.client.admin.compaction.CompactionSelector;
 import org.apache.accumulo.core.client.admin.compaction.CompressionConfigurer;
+import org.apache.accumulo.core.clientImpl.TableOperationsImpl;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
@@ -273,7 +274,7 @@ public class CompactionIT extends AccumuloClusterHarness {
       t.join();
       Exception e = error.get();
       assertNotNull(e);
-      assertEquals("Compaction canceled", e.getMessage());
+      assertEquals(TableOperationsImpl.COMPACTION_CANCELED_MSG, 
e.getMessage());
     }
   }
 
@@ -314,7 +315,7 @@ public class CompactionIT extends AccumuloClusterHarness {
       t.join();
       Exception e = error.get();
       assertNotNull(e);
-      assertEquals("Compaction canceled", e.getMessage());
+      assertEquals(TableOperationsImpl.COMPACTION_CANCELED_MSG, 
e.getMessage());
     }
   }
 
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
index f02acbd..ada7cb5 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
@@ -45,6 +45,8 @@ import org.apache.accumulo.core.client.TableOfflineException;
 import org.apache.accumulo.core.client.admin.CompactionConfig;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.clientImpl.TableOperationsImpl;
+import 
org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
@@ -53,6 +55,9 @@ import org.junit.Test;
 
 public class ConcurrentDeleteTableIT extends AccumuloClusterHarness {
 
+  private final NewTableConfiguration ntc = new 
NewTableConfiguration().withSplits(createSplits());
+  private final int NUM_TABLES = 2;
+
   @Override
   protected int defaultTimeoutSeconds() {
     return 7 * 60;
@@ -62,47 +67,42 @@ public class ConcurrentDeleteTableIT extends 
AccumuloClusterHarness {
   public void testConcurrentDeleteTablesOps() throws Exception {
     try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
 
-      String[] tables = getUniqueNames(2);
+      String[] tables = getUniqueNames(NUM_TABLES);
 
-      TreeSet<Text> splits = createSplits();
-      NewTableConfiguration ntc = new 
NewTableConfiguration().withSplits(splits);
+      int numDeleteOps = 20;
 
-      ExecutorService es = Executors.newFixedThreadPool(20);
+      ExecutorService es = Executors.newFixedThreadPool(numDeleteOps);
 
       int count = 0;
       for (final String table : tables) {
         c.tableOperations().create(table, ntc);
         writeData(c, table);
-        if (count == 1) {
+        // flush last table
+        if (count == tables.length - 1) {
           c.tableOperations().flush(table, null, null, true);
         }
         count++;
 
-        int numDeleteOps = 20;
         final CountDownLatch cdl = new CountDownLatch(numDeleteOps);
 
         List<Future<?>> futures = new ArrayList<>();
 
         for (int i = 0; i < numDeleteOps; i++) {
-          Future<?> future = es.submit(new Runnable() {
-
-            @Override
-            public void run() {
-              try {
-                cdl.countDown();
-                cdl.await();
-                c.tableOperations().delete(table);
-              } catch (TableNotFoundException e) {
-                // expected
-              } catch (InterruptedException | AccumuloException | 
AccumuloSecurityException e) {
-                throw new RuntimeException(e);
-              }
+          futures.add(es.submit(() -> {
+            try {
+              cdl.countDown();
+              cdl.await();
+              c.tableOperations().delete(table);
+            } catch (TableNotFoundException e) {
+              // expected
+            } catch (InterruptedException | AccumuloException | 
AccumuloSecurityException e) {
+              throw new RuntimeException(e);
             }
-          });
-
-          futures.add(future);
+          }));
         }
 
+        assertEquals(numDeleteOps, futures.size());
+
         for (Future<?> future : futures) {
           future.get();
         }
@@ -121,49 +121,11 @@ public class ConcurrentDeleteTableIT extends 
AccumuloClusterHarness {
     }
   }
 
-  private TreeSet<Text> createSplits() {
-    TreeSet<Text> splits = new TreeSet<>();
-
-    for (int i = 0; i < 1000; i++) {
-      Text split = new Text(String.format("%09x", i * 100000));
-      splits.add(split);
-    }
-    return splits;
-  }
-
-  private abstract static class DelayedTableOp implements Runnable {
-    private CountDownLatch cdl;
-
-    DelayedTableOp(CountDownLatch cdl) {
-      this.cdl = cdl;
-    }
-
-    @Override
-    public void run() {
-      try {
-        cdl.countDown();
-        cdl.await();
-        Thread.sleep(10);
-        doTableOp();
-      } catch (TableNotFoundException | TableOfflineException e) {
-        // expected
-      } catch (RuntimeException e) {
-        throw e;
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    protected abstract void doTableOp() throws Exception;
-  }
-
   @Test
   public void testConcurrentFateOpsWithDelete() throws Exception {
     try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
-      String[] tables = getUniqueNames(2);
 
-      TreeSet<Text> splits = createSplits();
-      NewTableConfiguration ntc = new 
NewTableConfiguration().withSplits(splits);
+      String[] tables = getUniqueNames(NUM_TABLES);
 
       int numOperations = 8;
 
@@ -173,7 +135,8 @@ public class ConcurrentDeleteTableIT extends 
AccumuloClusterHarness {
       for (final String table : tables) {
         c.tableOperations().create(table, ntc);
         writeData(c, table);
-        if (count == 1) {
+        // flush last table
+        if (count == tables.length - 1) {
           c.tableOperations().flush(table, null, null, true);
         }
         count++;
@@ -183,18 +146,15 @@ public class ConcurrentDeleteTableIT extends 
AccumuloClusterHarness {
 
         List<Future<?>> futures = new ArrayList<>();
 
-        futures.add(es.submit(new Runnable() {
-          @Override
-          public void run() {
-            try {
-              cdl.countDown();
-              cdl.await();
-              c.tableOperations().delete(table);
-            } catch (TableNotFoundException | TableOfflineException e) {
-              // expected
-            } catch (InterruptedException | AccumuloException | 
AccumuloSecurityException e) {
-              throw new RuntimeException(e);
-            }
+        futures.add(es.submit(() -> {
+          try {
+            cdl.countDown();
+            cdl.await();
+            c.tableOperations().delete(table);
+          } catch (TableNotFoundException | TableOfflineException e) {
+            // expected
+          } catch (InterruptedException | AccumuloException | 
AccumuloSecurityException e) {
+            throw new RuntimeException(e);
           }
         }));
 
@@ -269,12 +229,54 @@ public class ConcurrentDeleteTableIT extends 
AccumuloClusterHarness {
     }
   }
 
+  private abstract static class DelayedTableOp implements Runnable {
+    private final CountDownLatch cdl;
+
+    DelayedTableOp(CountDownLatch cdl) {
+      this.cdl = cdl;
+    }
+
+    @Override
+    public void run() {
+      try {
+        cdl.countDown();
+        cdl.await();
+        Thread.sleep(10);
+        doTableOp();
+      } catch (TableNotFoundException | TableOfflineException e) {
+        // expected
+      } catch (RuntimeException e) {
+        throw e;
+      } catch (Exception e) {
+        if (e.getCause().getClass().equals(ThriftTableOperationException.class)
+            && 
(e.getMessage().equals(TableOperationsImpl.COMPACTION_CANCELED_MSG)
+                || 
e.getMessage().equals(TableOperationsImpl.TABLE_DELETED_MSG))) {
+          // acceptable
+        } else {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
+    protected abstract void doTableOp() throws Exception;
+  }
+
+  private TreeSet<Text> createSplits() {
+    TreeSet<Text> splits = new TreeSet<>();
+
+    for (int i = 0; i < 1_000; i++) {
+      Text split = new Text(String.format("%09x", i * 100_000));
+      splits.add(split);
+    }
+    return splits;
+  }
+
   private void writeData(AccumuloClient c, String table)
       throws TableNotFoundException, MutationsRejectedException {
     try (BatchWriter bw = c.createBatchWriter(table)) {
       Random rand = new SecureRandom();
-      for (int i = 0; i < 1000; i++) {
-        Mutation m = new Mutation(String.format("%09x", rand.nextInt(100000 * 
1000)));
+      for (int i = 0; i < 1_000; i++) {
+        Mutation m = new Mutation(String.format("%09x", rand.nextInt(100_000 * 
1_000)));
         m.put("m", "order", "" + i);
         bw.addMutation(m);
       }
diff --git a/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java 
b/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java
index b3178cb..d2abe80 100644
--- a/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java
+++ b/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java
@@ -36,6 +36,7 @@ import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.admin.ActiveCompaction;
+import org.apache.accumulo.core.clientImpl.TableOperationsImpl;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
@@ -180,7 +181,7 @@ public class SlowOps {
           completed = true;
         } catch (Throwable ex) {
           // test cancels compaction on complete, so ignore it as an exception.
-          if (ex.getMessage().contains("Compaction canceled")) {
+          if 
(ex.getMessage().contains(TableOperationsImpl.COMPACTION_CANCELED_MSG)) {
             return;
           }
           log.info("Exception thrown while waiting for compaction - will 
retry", ex);

Reply via email to