This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new 8fddfb6b63 Execute compaction configurer plugin set on table (#3538)
8fddfb6b63 is described below
commit 8fddfb6b63e244a00233411f4683fec817ec6f47
Author: Keith Turner <[email protected]>
AuthorDate: Fri Jun 30 12:15:34 2023 -0400
Execute compaction configurer plugin set on table (#3538)
Any compaction initiated in the manager should now execute a compaction
configurer plugin if its set on the table.
Fixes #3468
---
.../server/compaction/CompactionPluginUtils.java | 19 +++++--
.../coordinator/CompactionCoordinator.java | 46 ++++++++--------
.../accumulo/test/functional/CompactionIT.java | 61 ++++++++++++++++++----
3 files changed, 89 insertions(+), 37 deletions(-)
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionPluginUtils.java
b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionPluginUtils.java
index 81664b7a7f..6f7529948f 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionPluginUtils.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionPluginUtils.java
@@ -151,16 +151,27 @@ public class CompactionPluginUtils {
.collect(Collectors.toSet());
}
- public static Map<String,String> computeOverrides(CompactionConfig
compactionConfig,
+ public static Map<String,String> computeOverrides(Optional<CompactionConfig>
compactionConfig,
ServerContext context, KeyExtent extent, Set<CompactableFile> files) {
- if (!UserCompactionUtils.isDefault(compactionConfig.getConfigurer())) {
+ if (compactionConfig.isPresent()
+ &&
!UserCompactionUtils.isDefault(compactionConfig.orElseThrow().getConfigurer()))
{
return CompactionPluginUtils.computeOverrides(context, extent, files,
- compactionConfig.getConfigurer());
+ compactionConfig.orElseThrow().getConfigurer());
}
- return null;
+ var tableConf = context.getTableConfiguration(extent.tableId());
+ var configurorClass = tableConf.get(Property.TABLE_COMPACTION_CONFIGURER);
+ if (configurorClass == null || configurorClass.isBlank()) {
+ return Map.of();
+ }
+
+ var opts =
+
tableConf.getAllPropertiesWithPrefixStripped(Property.TABLE_COMPACTION_CONFIGURER_OPTS);
+
+ return CompactionPluginUtils.computeOverrides(context, extent, files,
+ new PluginConfig(configurorClass, opts));
}
public static Map<String,String> computeOverrides(ServerContext context,
KeyExtent extent,
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index a5ab6dc1be..2c0bc24000 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -45,7 +45,6 @@ import java.util.stream.Collectors;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode;
@@ -471,26 +470,13 @@ public class CompactionCoordinator implements
CompactionCoordinatorService.Iface
TExternalCompactionJob createThriftJob(String externalCompactionId,
ExternalCompactionMetadata ecm, CompactionJobQueues.MetaJob metaJob) {
- List<IteratorSetting> iters = List.of();
+ Optional<CompactionConfig> compactionConfig = getCompactionConfig(metaJob);
- Map<String,String> overrides = null;
+ Map<String,String> overrides =
CompactionPluginUtils.computeOverrides(compactionConfig, ctx,
+ metaJob.getTabletMetadata().getExtent(), metaJob.getJob().getFiles());
- if (metaJob.getJob().getKind() == CompactionKind.USER) {
- try {
- Pair<Long,CompactionConfig> cconf =
- CompactionConfigStorage.getCompactionID(ctx,
metaJob.getTabletMetadata().getExtent());
- if (cconf != null) {
- iters = cconf.getSecond().getIterators();
-
- overrides =
CompactionPluginUtils.computeOverrides(cconf.getSecond(), ctx,
- metaJob.getTabletMetadata().getExtent(),
metaJob.getJob().getFiles());
- }
- } catch (KeeperException.NoNodeException e) {
- throw new RuntimeException(e);
- }
- }
-
- IteratorConfig iteratorSettings =
SystemIteratorUtil.toIteratorConfig(iters);
+ IteratorConfig iteratorSettings = SystemIteratorUtil
+
.toIteratorConfig(compactionConfig.map(CompactionConfig::getIterators).orElse(List.of()));
var files = ecm.getJobFiles().stream().map(storedTabletFile -> {
var dfv =
metaJob.getTabletMetadata().getFilesMap().get(storedTabletFile);
@@ -498,10 +484,6 @@ public class CompactionCoordinator implements
CompactionCoordinatorService.Iface
dfv.getNumEntries(), dfv.getTime());
}).collect(Collectors.toList());
- if (overrides == null) {
- overrides = Map.of();
- }
-
return new TExternalCompactionJob(externalCompactionId,
metaJob.getTabletMetadata().getExtent().toThrift(), files,
iteratorSettings,
ecm.getCompactTmpName().getNormalizedPathStr(),
ecm.getPropagateDeletes(),
@@ -509,6 +491,24 @@ public class CompactionCoordinator implements
CompactionCoordinatorService.Iface
ecm.getCompactionId() == null ? 0 : ecm.getCompactionId(), overrides);
}
+ private Optional<CompactionConfig>
getCompactionConfig(CompactionJobQueues.MetaJob metaJob) {
+ Optional<CompactionConfig> compactionConfig = Optional.empty();
+
+ if (metaJob.getJob().getKind() == CompactionKind.USER
+ || metaJob.getJob().getKind() == CompactionKind.SELECTOR) {
+ try {
+ Pair<Long,CompactionConfig> cconf =
+ CompactionConfigStorage.getCompactionID(ctx,
metaJob.getTabletMetadata().getExtent());
+ if (cconf != null) {
+ compactionConfig = Optional.of(cconf.getSecond());
+ }
+ } catch (KeeperException.NoNodeException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return compactionConfig;
+ }
+
/**
* Compactor calls compactionCompleted passing in the CompactionStats
*
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
index 0cb24e691f..c9992b3ccd 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
@@ -30,6 +30,7 @@ import static org.junit.jupiter.api.Assertions.fail;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
@@ -85,7 +86,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.Text;
-import org.bouncycastle.util.Arrays;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -423,15 +423,7 @@ public class CompactionIT extends AccumuloClusterHarness {
client.tableOperations().create(tableName, ntc);
byte[] data = new byte[100000];
- Arrays.fill(data, (byte) 65);
- try (var writer = client.createBatchWriter(tableName)) {
- for (int row = 0; row < 10; row++) {
- Mutation m = new Mutation(row + "");
- m.at().family("big").qualifier("stuff").put(data);
- writer.addMutation(m);
- }
- }
- client.tableOperations().flush(tableName, null, null, true);
+ generateConfigurerTestData(tableName, client, data);
// without compression, expect file to be large
long sizes = CompactionExecutorIT.getFileSizes(client, tableName);
@@ -459,6 +451,55 @@ public class CompactionIT extends AccumuloClusterHarness {
}
}
+ @Test
+ public void testConfigurerSetOnTable() throws Exception {
+ String tableName = this.getUniqueNames(1)[0];
+
+ try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
+
+ byte[] data = new byte[100000];
+
+ Map<String,
+ String> props =
Map.of(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), "none",
+ Property.TABLE_COMPACTION_CONFIGURER.getKey(),
CompressionConfigurer.class.getName(),
+ Property.TABLE_COMPACTION_CONFIGURER_OPTS.getKey()
+ + CompressionConfigurer.LARGE_FILE_COMPRESSION_TYPE,
+ "gz", Property.TABLE_COMPACTION_CONFIGURER_OPTS.getKey()
+ + CompressionConfigurer.LARGE_FILE_COMPRESSION_THRESHOLD,
+ "" + data.length);
+ NewTableConfiguration ntc = new
NewTableConfiguration().setProperties(props);
+ client.tableOperations().create(tableName, ntc);
+
+ generateConfigurerTestData(tableName, client, data);
+
+ // without compression, expect file to be large
+ long sizes = CompactionExecutorIT.getFileSizes(client, tableName);
+ assertTrue(sizes > data.length * 10 && sizes < data.length * 11,
+ "Unexpected files sizes : " + sizes);
+
+ client.tableOperations().compact(tableName, new
CompactionConfig().setWait(true));
+
+ // after compacting with compression, expect small file
+ sizes = CompactionExecutorIT.getFileSizes(client, tableName);
+ assertTrue(sizes < data.length,
+ "Unexpected files sizes: data: " + data.length + ", file:" + sizes);
+
+ }
+ }
+
+ private static void generateConfigurerTestData(String tableName,
AccumuloClient client,
+ byte[] data) throws AccumuloException, AccumuloSecurityException,
TableNotFoundException {
+ Arrays.fill(data, (byte) 65);
+ try (var writer = client.createBatchWriter(tableName)) {
+ for (int row = 0; row < 10; row++) {
+ Mutation m = new Mutation(row + "");
+ m.at().family("big").qualifier("stuff").put(data);
+ writer.addMutation(m);
+ }
+ }
+ client.tableOperations().flush(tableName, null, null, true);
+ }
+
@Test
public void testSuccessfulCompaction() throws Exception {
try (AccumuloClient c =
Accumulo.newClient().from(getClientProps()).build()) {