This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push: new b44069b Add external compaction IT b44069b is described below commit b44069bfb99d9fc76b65d11c839fd377f7bb9441 Author: Keith Turner <ktur...@apache.org> AuthorDate: Mon Apr 12 17:45:51 2021 -0400 Add external compaction IT --- .../server/compaction/ExternalCompactionUtil.java | 4 - .../accumulo/compactor/CompactionEnvironment.java | 26 ++++- .../org/apache/accumulo/compactor/Compactor.java | 3 +- test/pom.xml | 8 ++ .../apache/accumulo/test/ExternalCompactionIT.java | 126 +++++++++++++++++++++ 5 files changed, 159 insertions(+), 8 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java index 38c65a9..26ff813 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java @@ -65,8 +65,6 @@ public class ExternalCompactionUtil { /** * - * @param context - * server context * @return null if Coordinator node not found, else HostAndPort */ public static HostAndPort findCompactionCoordinator(ServerContext context) { @@ -85,8 +83,6 @@ public class ExternalCompactionUtil { } /** - * @param context - * server context * @return list of Compactors */ public static List<HostAndPort> getCompactorAddrs(ServerContext context) { diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java index fad132b..b37a737 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java @@ -36,16 +36,36 @@ import org.apache.accumulo.server.compaction.Compactor.CompactionEnv; import org.apache.accumulo.server.iterators.SystemIteratorEnvironment; import org.apache.accumulo.server.iterators.TabletIteratorEnvironment; +import com.google.common.annotations.VisibleForTesting; + public class CompactionEnvironment implements Closeable, CompactionEnv { private final ServerContext context; private final CompactionJobHolder jobHolder; private final SharedRateLimiterFactory limiter; + private String queueName; + + public static class CompactorIterEnv extends TabletIteratorEnvironment { + + private String queueName; + + public CompactorIterEnv(ServerContext context, IteratorScope scope, boolean fullMajC, + AccumuloConfiguration tableConfig, TableId tableId, CompactionKind kind, String queueName) { + super(context, scope, fullMajC, tableConfig, tableId, kind); + this.queueName = queueName; + } + + @VisibleForTesting + public String getQueueName() { + return queueName; + } + } - CompactionEnvironment(ServerContext context, CompactionJobHolder jobHolder) { + CompactionEnvironment(ServerContext context, CompactionJobHolder jobHolder, String queueName) { this.context = context; this.jobHolder = jobHolder; this.limiter = SharedRateLimiterFactory.getInstance(this.context.getConfiguration()); + this.queueName = queueName; } @Override @@ -77,9 +97,9 @@ public class CompactionEnvironment implements Closeable, CompactionEnv { @Override public SystemIteratorEnvironment createIteratorEnv(ServerContext context, AccumuloConfiguration acuTableConf, TableId tableId) { - return new TabletIteratorEnvironment(context, IteratorScope.majc, + return new CompactorIterEnv(context, IteratorScope.majc, !jobHolder.getJob().isPropagateDeletes(), acuTableConf, tableId, - CompactionKind.valueOf(jobHolder.getJob().getKind().name())); + CompactionKind.valueOf(jobHolder.getJob().getKind().name()), queueName); } @Override diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 130cda3..0190820 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -500,7 +500,8 @@ public class Compactor extends AbstractServer job.getIteratorSettings().getIterators() .forEach(tis -> iters.add(SystemIteratorUtil.toIteratorSetting(tis))); - try (CompactionEnvironment cenv = new CompactionEnvironment(getContext(), JOB_HOLDER)) { + try (CompactionEnvironment cenv = + new CompactionEnvironment(getContext(), JOB_HOLDER, queueName)) { org.apache.accumulo.server.compaction.Compactor compactor = new org.apache.accumulo.server.compaction.Compactor(getContext(), KeyExtent.fromThrift(job.getExtent()), files, outputFile, diff --git a/test/pom.xml b/test/pom.xml index 4f24f05..921425e 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -68,6 +68,14 @@ </dependency> <dependency> <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-compaction-coordinator</artifactId> + </dependency> + <dependency> + <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-compactor</artifactId> + </dependency> + <dependency> + <groupId>org.apache.accumulo</groupId> <artifactId>accumulo-core</artifactId> </dependency> <dependency> diff --git a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java new file mode 100644 index 0000000..1b47f58 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.accumulo.compactor.CompactionEnvironment.CompactorIterEnv; +import org.apache.accumulo.compactor.Compactor; +import org.apache.accumulo.coordinator.CompactionCoordinator; +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.Filter; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner; +import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.hadoop.conf.Configuration; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.base.Preconditions; + +public class ExternalCompactionIT extends ConfigurableMacBase { + + @Override + protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setProperty("tserver.compaction.major.service.cs1.planner", + DefaultCompactionPlanner.class.getName()); + cfg.setProperty("tserver.compaction.major.service.cs1.planner.opts.executors", + "[{'name':'all','externalQueue':'DCQ1'}]"); + } + + public static class TestFilter extends Filter { + + @Override + public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, + IteratorEnvironment env) throws IOException { + super.init(source, options, env); + + CompactorIterEnv cienv = (CompactorIterEnv) env; + + Preconditions.checkArgument(!cienv.getQueueName().isEmpty()); + Preconditions + .checkArgument(options.getOrDefault("expectedQ", "").equals(cienv.getQueueName())); + } + + @Override + public boolean accept(Key k, Value v) { + return Integer.parseInt(v.toString()) % 2 == 0; + } + + } + + @Test + public void testExternalCompaction() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { + Map<String,String> props = + Map.of("table.compaction.dispatcher", SimpleCompactionDispatcher.class.getName(), + "table.compaction.dispatcher.opts.service", "cs1"); + NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props); + + String tableName = "ectt"; + + client.tableOperations().create(tableName, ntc); + + try (BatchWriter bw = client.createBatchWriter(tableName)) { + for (int i = 0; i < 10; i++) { + Mutation m = new Mutation("r:" + i); + m.put("", "", "" + i); + bw.addMutation(m); + } + } + + client.tableOperations().flush(tableName); + + cluster.exec(Compactor.class, "-q", "DCQ1"); + cluster.exec(CompactionCoordinator.class); + + IteratorSetting iterSetting = new IteratorSetting(100, TestFilter.class); + // make sure iterator options make it to compactor process + iterSetting.addOption("expectedQ", "DCQ1"); + CompactionConfig config = + new CompactionConfig().setIterators(List.of(iterSetting)).setWait(true); + client.tableOperations().compact(tableName, config); + + try (Scanner scanner = client.createScanner(tableName)) { + int count = 0; + for (Entry<Key,Value> entry : scanner) { + Assert.assertTrue(Integer.parseInt(entry.getValue().toString()) % 2 == 0); + count++; + } + + Assert.assertEquals(5, count); + } + } + } +}