This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to 
refs/heads/1451-external-compactions-feature by this push:
     new 8ef963c  Improved IT and added sanity check
8ef963c is described below

commit 8ef963cbd923dbd22c21329a68601a27d237d1a7
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Tue Apr 13 10:37:20 2021 -0400

    Improved IT and added sanity check
---
 .../accumulo/tserver/ThriftClientHandler.java      |   7 +-
 .../tserver/compactions/CompactionManager.java     |  14 +--
 .../apache/accumulo/test/ExternalCompactionIT.java | 120 +++++++++++++++------
 3 files changed, 100 insertions(+), 41 deletions(-)

diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
index abb9968..2ac0d65 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
@@ -1703,8 +1703,8 @@ class ThriftClientHandler extends ClientServiceHandler 
implements TabletClientSe
     }
 
     server.getCompactionManager().commitExternalCompaction(
-        ExternalCompactionId.of(externalCompactionId), extent, 
server.getOnlineTablets(), fileSize,
-        entries);
+        ExternalCompactionId.of(externalCompactionId), 
KeyExtent.fromThrift(extent),
+        server.getOnlineTablets(), fileSize, entries);
   }
 
   @Override
@@ -1716,7 +1716,8 @@ class ThriftClientHandler extends ClientServiceHandler 
implements TabletClientSe
     }
 
     server.getCompactionManager().externalCompactionFailed(
-        ExternalCompactionId.of(externalCompactionId), extent, 
server.getOnlineTablets());
+        ExternalCompactionId.of(externalCompactionId), 
KeyExtent.fromThrift(extent),
+        server.getOnlineTablets());
   }
 
   @Override
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
index 44f0ccf..8228d12 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
@@ -33,7 +33,6 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
 import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
 import org.apache.accumulo.core.spi.compaction.CompactionKind;
@@ -50,6 +49,7 @@ import org.apache.accumulo.tserver.tablet.Tablet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
 
 public class CompactionManager {
@@ -447,12 +447,12 @@ public class CompactionManager {
   }
 
   public void commitExternalCompaction(ExternalCompactionId extCompactionId,
-      TKeyExtent extentCompacted, Map<KeyExtent,Tablet> currentTablets, long 
fileSize,
+      KeyExtent extentCompacted, Map<KeyExtent,Tablet> currentTablets, long 
fileSize,
       long entries) {
     KeyExtent extent = runningExternalCompactions.get(extCompactionId);
-    // CBUG Use extentCompacted to perform additional validation that the 
extent has not
-    // merged, split, or otherwise changed.
     if (extent != null) {
+      Preconditions.checkState(extent.equals(extentCompacted),
+          "Unexpected extent seen on compaction commit %s %s", extent, 
extentCompacted);
       Tablet tablet = currentTablets.get(extent);
       if (tablet != null) {
         tablet.asCompactable().commitExternalCompaction(extCompactionId, 
fileSize, entries);
@@ -469,12 +469,12 @@ public class CompactionManager {
     return (null != extent && extent.compareTo(ke) == 0);
   }
 
-  public void externalCompactionFailed(ExternalCompactionId ecid, TKeyExtent 
extentCompacted,
+  public void externalCompactionFailed(ExternalCompactionId ecid, KeyExtent 
extentCompacted,
       Map<KeyExtent,Tablet> currentTablets) {
-    // CBUG Use extentCompacted to perform additional validation that the 
extent has not
-    // merged, split, or otherwise changed.
     KeyExtent extent = runningExternalCompactions.get(ecid);
     if (extent != null) {
+      Preconditions.checkState(extent.equals(extentCompacted),
+          "Unexpected extent seen on compaction commit %s %s", extent, 
extentCompacted);
       Tablet tablet = currentTablets.get(extent);
       if (tablet != null) {
         tablet.asCompactable().externalCompactionFailed(ecid);
diff --git 
a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java 
b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
index 1b47f58..be208a1 100644
--- a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
@@ -28,9 +28,13 @@ import org.apache.accumulo.compactor.Compactor;
 import org.apache.accumulo.coordinator.CompactionCoordinator;
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.admin.CompactionConfig;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.core.data.Key;
@@ -38,6 +42,7 @@ import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.Filter;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner;
 import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher;
@@ -57,10 +62,16 @@ public class ExternalCompactionIT extends 
ConfigurableMacBase {
         DefaultCompactionPlanner.class.getName());
     
cfg.setProperty("tserver.compaction.major.service.cs1.planner.opts.executors",
         "[{'name':'all','externalQueue':'DCQ1'}]");
+    cfg.setProperty("tserver.compaction.major.service.cs2.planner",
+        DefaultCompactionPlanner.class.getName());
+    
cfg.setProperty("tserver.compaction.major.service.cs2.planner.opts.executors",
+        "[{'name':'all','externalQueue':'DCQ2'}]");
   }
 
   public static class TestFilter extends Filter {
 
+    int modulus = 1;
+
     @Override
     public void init(SortedKeyValueIterator<Key,Value> source, 
Map<String,String> options,
         IteratorEnvironment env) throws IOException {
@@ -71,11 +82,22 @@ public class ExternalCompactionIT extends 
ConfigurableMacBase {
       Preconditions.checkArgument(!cienv.getQueueName().isEmpty());
       Preconditions
           .checkArgument(options.getOrDefault("expectedQ", 
"").equals(cienv.getQueueName()));
+
+      Preconditions.checkArgument(cienv.isFullMajorCompaction());
+      Preconditions.checkArgument(cienv.isUserCompaction());
+      Preconditions.checkArgument(cienv.getIteratorScope() == 
IteratorScope.majc);
+      Preconditions.checkArgument(!cienv.isSamplingEnabled());
+
+      // if the init function is never called at all, then not setting the 
modulus option should
+      // cause the test to fail
+      if (options.containsKey("modulus")) {
+        modulus = Integer.parseInt(options.get("modulus"));
+      }
     }
 
     @Override
     public boolean accept(Key k, Value v) {
-      return Integer.parseInt(v.toString()) % 2 == 0;
+      return Integer.parseInt(v.toString()) % modulus == 0;
     }
 
   }
@@ -83,44 +105,80 @@ public class ExternalCompactionIT extends 
ConfigurableMacBase {
   @Test
   public void testExternalCompaction() throws Exception {
     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {
-      Map<String,String> props =
-          Map.of("table.compaction.dispatcher", 
SimpleCompactionDispatcher.class.getName(),
-              "table.compaction.dispatcher.opts.service", "cs1");
-      NewTableConfiguration ntc = new 
NewTableConfiguration().setProperties(props);
-
-      String tableName = "ectt";
 
-      client.tableOperations().create(tableName, ntc);
+      String table1 = "ectt1";
+      createTable(client, table1, "cs1");
 
-      try (BatchWriter bw = client.createBatchWriter(tableName)) {
-        for (int i = 0; i < 10; i++) {
-          Mutation m = new Mutation("r:" + i);
-          m.put("", "", "" + i);
-          bw.addMutation(m);
-        }
-      }
+      String table2 = "ectt2";
+      createTable(client, table2, "cs2");
 
-      client.tableOperations().flush(tableName);
+      wrtieData(client, table1);
+      wrtieData(client, table2);
 
       cluster.exec(Compactor.class, "-q", "DCQ1");
+      cluster.exec(Compactor.class, "-q", "DCQ2");
       cluster.exec(CompactionCoordinator.class);
 
-      IteratorSetting iterSetting = new IteratorSetting(100, TestFilter.class);
-      // make sure iterator options make it to compactor process
-      iterSetting.addOption("expectedQ", "DCQ1");
-      CompactionConfig config =
-          new 
CompactionConfig().setIterators(List.of(iterSetting)).setWait(true);
-      client.tableOperations().compact(tableName, config);
-
-      try (Scanner scanner = client.createScanner(tableName)) {
-        int count = 0;
-        for (Entry<Key,Value> entry : scanner) {
-          Assert.assertTrue(Integer.parseInt(entry.getValue().toString()) % 2 
== 0);
-          count++;
-        }
-
-        Assert.assertEquals(5, count);
+      compact(client, table1, 2, "DCQ1");
+      verify(client, table1, 2);
+
+      compact(client, table2, 3, "DCQ2");
+      verify(client, table2, 3);
+
+    }
+  }
+
+  private void verify(AccumuloClient client, String table1, int modulus)
+      throws TableNotFoundException, AccumuloSecurityException, 
AccumuloException {
+    try (Scanner scanner = client.createScanner(table1)) {
+      int count = 0;
+      for (Entry<Key,Value> entry : scanner) {
+        Assert.assertTrue(Integer.parseInt(entry.getValue().toString()) % 
modulus == 0);
+        count++;
+      }
+
+      int expectedCount = 0;
+      for (int i = 0; i < 10; i++) {
+        if (i % modulus == 0)
+          expectedCount++;
       }
+
+      Assert.assertEquals(expectedCount, count);
     }
   }
+
+  private void compact(AccumuloClient client, String table1, int modulus, 
String expectedQueue)
+      throws AccumuloSecurityException, TableNotFoundException, 
AccumuloException {
+    IteratorSetting iterSetting = new IteratorSetting(100, TestFilter.class);
+    // make sure iterator options make it to compactor process
+    iterSetting.addOption("expectedQ", expectedQueue);
+    iterSetting.addOption("modulus", modulus + "");
+    CompactionConfig config =
+        new 
CompactionConfig().setIterators(List.of(iterSetting)).setWait(true);
+    client.tableOperations().compact(table1, config);
+  }
+
+  private void createTable(AccumuloClient client, String tableName, String 
service)
+      throws Exception {
+    Map<String,String> props =
+        Map.of("table.compaction.dispatcher", 
SimpleCompactionDispatcher.class.getName(),
+            "table.compaction.dispatcher.opts.service", service);
+    NewTableConfiguration ntc = new 
NewTableConfiguration().setProperties(props);
+
+    client.tableOperations().create(tableName, ntc);
+
+  }
+
+  private void wrtieData(AccumuloClient client, String table1) throws 
MutationsRejectedException,
+      TableNotFoundException, AccumuloException, AccumuloSecurityException {
+    try (BatchWriter bw = client.createBatchWriter(table1)) {
+      for (int i = 0; i < 10; i++) {
+        Mutation m = new Mutation("r:" + i);
+        m.put("", "", "" + i);
+        bw.addMutation(m);
+      }
+    }
+
+    client.tableOperations().flush(table1);
+  }
 }

Reply via email to