This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 8fddfb6b63 Execute compaction configurer plugin set on table (#3538)
8fddfb6b63 is described below

commit 8fddfb6b63e244a00233411f4683fec817ec6f47
Author: Keith Turner <[email protected]>
AuthorDate: Fri Jun 30 12:15:34 2023 -0400

    Execute compaction configurer plugin set on table (#3538)
    
    Any compaction initiated in the manager should now execute a compaction
    configurer plugin if its set on the table.
    
    Fixes #3468
---
 .../server/compaction/CompactionPluginUtils.java   | 19 +++++--
 .../coordinator/CompactionCoordinator.java         | 46 ++++++++--------
 .../accumulo/test/functional/CompactionIT.java     | 61 ++++++++++++++++++----
 3 files changed, 89 insertions(+), 37 deletions(-)

diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionPluginUtils.java
 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionPluginUtils.java
index 81664b7a7f..6f7529948f 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionPluginUtils.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionPluginUtils.java
@@ -151,16 +151,27 @@ public class CompactionPluginUtils {
         .collect(Collectors.toSet());
   }
 
-  public static Map<String,String> computeOverrides(CompactionConfig 
compactionConfig,
+  public static Map<String,String> computeOverrides(Optional<CompactionConfig> 
compactionConfig,
       ServerContext context, KeyExtent extent, Set<CompactableFile> files) {
 
-    if (!UserCompactionUtils.isDefault(compactionConfig.getConfigurer())) {
+    if (compactionConfig.isPresent()
+        && 
!UserCompactionUtils.isDefault(compactionConfig.orElseThrow().getConfigurer())) 
{
       return CompactionPluginUtils.computeOverrides(context, extent, files,
-          compactionConfig.getConfigurer());
+          compactionConfig.orElseThrow().getConfigurer());
     }
 
-    return null;
+    var tableConf = context.getTableConfiguration(extent.tableId());
 
+    var configurorClass = tableConf.get(Property.TABLE_COMPACTION_CONFIGURER);
+    if (configurorClass == null || configurorClass.isBlank()) {
+      return Map.of();
+    }
+
+    var opts =
+        
tableConf.getAllPropertiesWithPrefixStripped(Property.TABLE_COMPACTION_CONFIGURER_OPTS);
+
+    return CompactionPluginUtils.computeOverrides(context, extent, files,
+        new PluginConfig(configurorClass, opts));
   }
 
   public static Map<String,String> computeOverrides(ServerContext context, 
KeyExtent extent,
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index a5ab6dc1be..2c0bc24000 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -45,7 +45,6 @@ import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.admin.CompactionConfig;
 import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode;
@@ -471,26 +470,13 @@ public class CompactionCoordinator implements 
CompactionCoordinatorService.Iface
   TExternalCompactionJob createThriftJob(String externalCompactionId,
       ExternalCompactionMetadata ecm, CompactionJobQueues.MetaJob metaJob) {
 
-    List<IteratorSetting> iters = List.of();
+    Optional<CompactionConfig> compactionConfig = getCompactionConfig(metaJob);
 
-    Map<String,String> overrides = null;
+    Map<String,String> overrides = 
CompactionPluginUtils.computeOverrides(compactionConfig, ctx,
+        metaJob.getTabletMetadata().getExtent(), metaJob.getJob().getFiles());
 
-    if (metaJob.getJob().getKind() == CompactionKind.USER) {
-      try {
-        Pair<Long,CompactionConfig> cconf =
-            CompactionConfigStorage.getCompactionID(ctx, 
metaJob.getTabletMetadata().getExtent());
-        if (cconf != null) {
-          iters = cconf.getSecond().getIterators();
-
-          overrides = 
CompactionPluginUtils.computeOverrides(cconf.getSecond(), ctx,
-              metaJob.getTabletMetadata().getExtent(), 
metaJob.getJob().getFiles());
-        }
-      } catch (KeeperException.NoNodeException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    IteratorConfig iteratorSettings = 
SystemIteratorUtil.toIteratorConfig(iters);
+    IteratorConfig iteratorSettings = SystemIteratorUtil
+        
.toIteratorConfig(compactionConfig.map(CompactionConfig::getIterators).orElse(List.of()));
 
     var files = ecm.getJobFiles().stream().map(storedTabletFile -> {
       var dfv = 
metaJob.getTabletMetadata().getFilesMap().get(storedTabletFile);
@@ -498,10 +484,6 @@ public class CompactionCoordinator implements 
CompactionCoordinatorService.Iface
           dfv.getNumEntries(), dfv.getTime());
     }).collect(Collectors.toList());
 
-    if (overrides == null) {
-      overrides = Map.of();
-    }
-
     return new TExternalCompactionJob(externalCompactionId,
         metaJob.getTabletMetadata().getExtent().toThrift(), files, 
iteratorSettings,
         ecm.getCompactTmpName().getNormalizedPathStr(), 
ecm.getPropagateDeletes(),
@@ -509,6 +491,24 @@ public class CompactionCoordinator implements 
CompactionCoordinatorService.Iface
         ecm.getCompactionId() == null ? 0 : ecm.getCompactionId(), overrides);
   }
 
+  private Optional<CompactionConfig> 
getCompactionConfig(CompactionJobQueues.MetaJob metaJob) {
+    Optional<CompactionConfig> compactionConfig = Optional.empty();
+
+    if (metaJob.getJob().getKind() == CompactionKind.USER
+        || metaJob.getJob().getKind() == CompactionKind.SELECTOR) {
+      try {
+        Pair<Long,CompactionConfig> cconf =
+            CompactionConfigStorage.getCompactionID(ctx, 
metaJob.getTabletMetadata().getExtent());
+        if (cconf != null) {
+          compactionConfig = Optional.of(cconf.getSecond());
+        }
+      } catch (KeeperException.NoNodeException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return compactionConfig;
+  }
+
   /**
    * Compactor calls compactionCompleted passing in the CompactionStats
    *
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
index 0cb24e691f..c9992b3ccd 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
@@ -30,6 +30,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
@@ -85,7 +86,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.Text;
-import org.bouncycastle.util.Arrays;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -423,15 +423,7 @@ public class CompactionIT extends AccumuloClusterHarness {
       client.tableOperations().create(tableName, ntc);
 
       byte[] data = new byte[100000];
-      Arrays.fill(data, (byte) 65);
-      try (var writer = client.createBatchWriter(tableName)) {
-        for (int row = 0; row < 10; row++) {
-          Mutation m = new Mutation(row + "");
-          m.at().family("big").qualifier("stuff").put(data);
-          writer.addMutation(m);
-        }
-      }
-      client.tableOperations().flush(tableName, null, null, true);
+      generateConfigurerTestData(tableName, client, data);
 
       // without compression, expect file to be large
       long sizes = CompactionExecutorIT.getFileSizes(client, tableName);
@@ -459,6 +451,55 @@ public class CompactionIT extends AccumuloClusterHarness {
     }
   }
 
+  @Test
+  public void testConfigurerSetOnTable() throws Exception {
+    String tableName = this.getUniqueNames(1)[0];
+
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+
+      byte[] data = new byte[100000];
+
+      Map<String,
+          String> props = 
Map.of(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), "none",
+              Property.TABLE_COMPACTION_CONFIGURER.getKey(), 
CompressionConfigurer.class.getName(),
+              Property.TABLE_COMPACTION_CONFIGURER_OPTS.getKey()
+                  + CompressionConfigurer.LARGE_FILE_COMPRESSION_TYPE,
+              "gz", Property.TABLE_COMPACTION_CONFIGURER_OPTS.getKey()
+                  + CompressionConfigurer.LARGE_FILE_COMPRESSION_THRESHOLD,
+              "" + data.length);
+      NewTableConfiguration ntc = new 
NewTableConfiguration().setProperties(props);
+      client.tableOperations().create(tableName, ntc);
+
+      generateConfigurerTestData(tableName, client, data);
+
+      // without compression, expect file to be large
+      long sizes = CompactionExecutorIT.getFileSizes(client, tableName);
+      assertTrue(sizes > data.length * 10 && sizes < data.length * 11,
+          "Unexpected files sizes : " + sizes);
+
+      client.tableOperations().compact(tableName, new 
CompactionConfig().setWait(true));
+
+      // after compacting with compression, expect small file
+      sizes = CompactionExecutorIT.getFileSizes(client, tableName);
+      assertTrue(sizes < data.length,
+          "Unexpected files sizes: data: " + data.length + ", file:" + sizes);
+
+    }
+  }
+
+  private static void generateConfigurerTestData(String tableName, 
AccumuloClient client,
+      byte[] data) throws AccumuloException, AccumuloSecurityException, 
TableNotFoundException {
+    Arrays.fill(data, (byte) 65);
+    try (var writer = client.createBatchWriter(tableName)) {
+      for (int row = 0; row < 10; row++) {
+        Mutation m = new Mutation(row + "");
+        m.at().family("big").qualifier("stuff").put(data);
+        writer.addMutation(m);
+      }
+    }
+    client.tableOperations().flush(tableName, null, null, true);
+  }
+
   @Test
   public void testSuccessfulCompaction() throws Exception {
     try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {

Reply via email to