This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 06ba9320ffbc5331776b0e9625d68eee9d93dc2f Merge: 3dfe28e8e1 bf50db77ec Author: Keith Turner <[email protected]> AuthorDate: Mon Jan 26 21:51:43 2026 +0000 Merge remote-tracking branch 'upstream/2.1' .../core/iteratorsImpl/IteratorConfigUtil.java | 3 +- .../accumulo/test/functional/CompactionIT.java | 51 ++++++++++++++++ .../accumulo/test/functional/ScanIteratorIT.java | 67 ++++++++++++++++++++++ 3 files changed, 120 insertions(+), 1 deletion(-) diff --cc test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java index 20a99852e6,a927bc4a3c..09d05fe3f4 --- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java @@@ -94,24 -92,24 +94,25 @@@ import org.apache.accumulo.core.metadat import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner; +import org.apache.accumulo.core.spi.compaction.CompactionKind; +import org.apache.accumulo.core.spi.compaction.CompactionPlan; +import org.apache.accumulo.core.spi.compaction.CompactionPlanner; +import org.apache.accumulo.core.spi.compaction.RatioBasedCompactionPlanner; import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher; -import org.apache.accumulo.core.util.HostAndPort; -import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.core.util.CountDownTimer; +import org.apache.accumulo.core.util.compaction.CompactionJobImpl; +import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.minicluster.ServerType; -import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl; -import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.test.VerifyIngest; import org.apache.accumulo.test.VerifyIngest.VerifyParams; -import org.apache.accumulo.test.compaction.CompactionExecutorIT; -import org.apache.accumulo.test.compaction.ExternalCompaction_1_IT.FSelector; +import org.apache.accumulo.test.compaction.ExternalCompactionTestUtils; + import org.apache.accumulo.test.functional.ScanIteratorIT.AppendingIterator; import org.apache.accumulo.test.util.Wait; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.io.Text; +import org.apache.thrift.transport.TTransportException; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -1206,6 -887,158 +1207,56 @@@ public class CompactionIT extends Compa } } - @Test - public void testMigrationCancelCompaction() throws Exception { - - // This test creates 40 tablets w/ slow iterator, causes 40 compactions to start, and then - // starts a new tablet server. Some of the tablets should migrate to the new tserver and cancel - // their compaction. Because the test uses a slow iterator, if close blocks on compaction then - // the test should timeout. Two tables are used to have different iterator settings inorder to - // test the two different way compactions can be canceled. Compactions can be canceled by thread - // interrupt or by a check that is done after a compaction iterator returns a key value. - - final String[] tables = this.getUniqueNames(2); - try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { - client.instanceOperations().setProperty( - Property.TSERV_COMPACTION_SERVICE_DEFAULT_EXECUTORS.getKey(), - "[{'name':'any','numThreads':20}]".replaceAll("'", "\"")); - - SortedSet<Text> splits = IntStream.range(1, 20).mapToObj(i -> String.format("%06d", i * 1000)) - .map(Text::new).collect(Collectors.toCollection(TreeSet::new)); - - // This iterator is intended to cover the case of a compaction being canceled by thread - // interrupt. - IteratorSetting setting1 = new IteratorSetting(50, "sleepy", SlowIterator.class); - setting1.addOption("sleepTime", "300000"); - setting1.addOption("seekSleepTime", "3000"); - SlowIterator.sleepUninterruptibly(setting1, false); - - client.tableOperations().create(tables[0], new NewTableConfiguration().withSplits(splits) - .attachIterator(setting1, EnumSet.of(IteratorScope.majc))); - - // This iterator is intended to cover the case of compaction being canceled by the check after - // a key value is returned. The iterator is configured to ignore interrupts. - IteratorSetting setting2 = new IteratorSetting(50, "sleepy", SlowIterator.class); - setting2.addOption("sleepTime", "2000"); - setting2.addOption("seekSleepTime", "2000"); - SlowIterator.sleepUninterruptibly(setting2, true); - - client.tableOperations().create(tables[1], new NewTableConfiguration().withSplits(splits) - .attachIterator(setting2, EnumSet.of(IteratorScope.majc))); - - // write files to each tablet, should cause compactions to start - for (var table : tables) { - for (int round = 0; round < 5; round++) { - try (var writer = client.createBatchWriter(table)) { - for (int i = 0; i < 20_000; i++) { - Mutation m = new Mutation(String.format("%06d", i)); - m.put("f", "q", "v"); - writer.addMutation(m); - } - } - client.tableOperations().flush(table, null, null, true); - } - } - - assertEquals(2, client.instanceOperations().getTabletServers().size()); - - var tableId1 = ((ClientContext) client).getTableId(tables[0]); - var tableId2 = ((ClientContext) client).getTableId(tables[1]); - - Wait.waitFor(() -> { - var runningCompactions = client.instanceOperations().getActiveCompactions().stream() - .map(ac -> ac.getTablet().getTable()) - .filter(tid -> tid.equals(tableId1) || tid.equals(tableId2)).count(); - log.debug("Running compactions {}", runningCompactions); - return runningCompactions == 40; - }); - - ((MiniAccumuloClusterImpl) getCluster()).getConfig().setNumTservers(3); - getCluster().getClusterControl().start(ServerType.TABLET_SERVER, "localhost"); - - Wait.waitFor(() -> { - var servers = client.instanceOperations().getTabletServers().size(); - log.debug("Server count {}", servers); - return 3 == servers; - }); - - Wait.waitFor(() -> { - try (var tablets = - ((ClientContext) client).getAmple().readTablets().forLevel(Ample.DataLevel.USER) - .fetch(ColumnType.LOCATION, ColumnType.PREV_ROW).build()) { - Map<String,Long> counts = new HashMap<>(); - for (var tablet : tablets) { - if (!tablet.getTableId().equals(tableId1) && !tablet.getTableId().equals(tableId2)) { - continue; - } - - if (tablet.getLocation() != null - && tablet.getLocation().getType() == TabletMetadata.LocationType.CURRENT) { - counts.merge(tablet.getLocation().getHostPort(), 1L, Long::sum); - } - } - - var total = counts.values().stream().mapToLong(l -> l).sum(); - var min = counts.values().stream().mapToLong(l -> l).min().orElse(0); - var max = counts.values().stream().mapToLong(l -> l).max().orElse(100); - var serversSeen = counts.keySet(); - log.debug("total:{} min:{} max:{} serversSeen:{}", total, min, max, serversSeen); - return total == 40 && min == 12 && max == 14 && serversSeen.size() == 3; - } - }); - } - } - + @Test + public void testIteratorOrder() throws Exception { + String[] names = getUniqueNames(2); + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + + // create a table with minor compaction iterators configured to ensure those iterators are + // applied in the correct order + NewTableConfiguration ntc = new NewTableConfiguration() + .attachIterator(AppendingIterator.configure(50, "x"), EnumSet.of(IteratorScope.minc)) + .attachIterator(AppendingIterator.configure(100, "a"), EnumSet.of(IteratorScope.minc)); + c.tableOperations().create(names[0], ntc); + + // create a table with major compaction iterators configured to ensure those iterators are + // applied in the correct order + NewTableConfiguration ntc2 = new NewTableConfiguration() + .attachIterator(AppendingIterator.configure(50, "x"), EnumSet.of(IteratorScope.majc)) + .attachIterator(AppendingIterator.configure(100, "a"), EnumSet.of(IteratorScope.majc)); + c.tableOperations().create(names[1], ntc2); + + try (var writer = c.createBatchWriter(names[0]); + var writer2 = c.createBatchWriter(names[1])) { + Mutation m = new Mutation("r1"); + m.put("", "", "base:"); + writer.addMutation(m); + writer2.addMutation(m); + } + + try (var mincScanner = c.createScanner(names[0]); + var majcScanner = c.createScanner(names[1])) { + // iterators should not be applied yet + assertEquals("base:", mincScanner.iterator().next().getValue().toString()); + assertEquals("base:", majcScanner.iterator().next().getValue().toString()); + + c.tableOperations().flush(names[0], null, null, true); + assertEquals("base:xa", mincScanner.iterator().next().getValue().toString()); + assertEquals("base:", majcScanner.iterator().next().getValue().toString()); + + // The user compaction iterators with priority 50 and 100 have the same priority as table + // level iterators. + List<IteratorSetting> iters = List.of(AppendingIterator.configure(70, "m"), + AppendingIterator.configure(50, "b"), AppendingIterator.configure(100, "c")); + c.tableOperations().compact(names[1], + new CompactionConfig().setWait(true).setFlush(true).setIterators(iters)); + assertEquals("base:xa", mincScanner.iterator().next().getValue().toString()); + assertEquals("base:bxmac", majcScanner.iterator().next().getValue().toString()); + + } + } + } + /** * Counts the number of tablets and files in a table. */ diff --cc test/src/main/java/org/apache/accumulo/test/functional/ScanIteratorIT.java index 65ee4c84fb,1f10949423..38912fc9c3 --- a/test/src/main/java/org/apache/accumulo/test/functional/ScanIteratorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ScanIteratorIT.java @@@ -18,8 -18,10 +18,9 @@@ */ package org.apache.accumulo.test.functional; -import static org.apache.accumulo.harness.AccumuloITBase.SUNNY_DAY; import static org.junit.jupiter.api.Assertions.assertEquals; + import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.Collections;
