This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 5fb05efd26d0c15f633dbd4e0d9ce8daa16282a5 Merge: 1db892b00d 8bf0970c7f Author: Keith Turner <ktur...@apache.org> AuthorDate: Thu Feb 1 17:16:50 2024 -0500 Merge branch '2.1' .../tserver/compactions/CompactionManager.java | 8 +- .../tserver/compactions/CompactionService.java | 1 + .../accumulo/test/functional/CompactionIT.java | 88 ++++++++++++++++++++++ 3 files changed, 93 insertions(+), 4 deletions(-) diff --cc test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java index 7b633e338d,93c8c1713e..6df72f17d9 --- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java @@@ -90,8 -77,9 +90,10 @@@ import org.apache.accumulo.core.metadat import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.security.Authorizations; + import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner; + import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher; import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.VerifyIngest; import org.apache.accumulo.test.VerifyIngest.VerifyParams; @@@ -705,200 -628,94 +707,286 @@@ public class CompactionIT extends Accum } } + @Test + public void testSelectNoFiles() throws Exception { + + // Test a compaction selector that selects no files. In this case there is no work to, + // so we want to ensure it does not hang + + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + + writeFlush(c, tableName, "a"); + writeFlush(c, tableName, "b"); + + CompactionConfig config = new CompactionConfig() + .setSelector(new PluginConfig(EmptyCompactionSelector.class.getName(), Map.of())) + .setWait(true); + c.tableOperations().compact(tableName, config); + + assertEquals(Set.of("a", "b"), getRows(c, tableName)); + } + + } + + @Test + public void testConcurrent() throws Exception { + // two compactions without iterators or strategy should be able to run concurrently + + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + + // write random data because its very unlikely it will compress + writeRandomValue(c, tableName, 1 << 16); + writeRandomValue(c, tableName, 1 << 16); + + c.tableOperations().compact(tableName, new CompactionConfig().setWait(false)); + c.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + + assertEquals(1, FunctionalTestUtils.countRFiles(c, tableName)); + + writeRandomValue(c, tableName, 1 << 16); + + IteratorSetting iterConfig = new IteratorSetting(30, SlowIterator.class); + SlowIterator.setSleepTime(iterConfig, 1000); + + long t1 = System.currentTimeMillis(); + c.tableOperations().compact(tableName, + new CompactionConfig().setWait(false).setIterators(java.util.Arrays.asList(iterConfig))); + try { + // this compaction should fail because previous one set iterators + c.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + if (System.currentTimeMillis() - t1 < 2000) { + fail("Expected compaction to fail because another concurrent compaction set iterators"); + } + } catch (AccumuloException e) {} + } + } + + @Test + public void testGetSelectedFilesForCompaction() throws Exception { + + // Tests CompactionConfigurer.InputParameters.getSelectedFiles() + + String tableName = this.getUniqueNames(1)[0]; + // Disable GC so intermediate compaction files are not deleted + getCluster().getClusterControl().stopAllServers(ServerType.GARBAGE_COLLECTOR); + + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + Map<String,String> props = new HashMap<>(); + props.put(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), "none"); + // This is done to avoid system compactions - we want to do all the compactions ourselves + props.put("table.compaction.dispatcher.opts.service.system", "nonexitant"); + NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props); + client.tableOperations().create(tableName, ntc); + + // The following will create 4 small and 4 large RFiles + // The 4 small files will be compacted into one file (an "intermediate compaction" file) + // Then, this file will be compacted with the 4 large files, creating the final compaction + // file + byte[] largeData = new byte[1_000_000]; + byte[] smallData = new byte[100_000]; + final int numFiles = 8; + Arrays.fill(largeData, (byte) 65); + Arrays.fill(smallData, (byte) 65); + try (var writer = client.createBatchWriter(tableName)) { + for (int i = 0; i < numFiles; i++) { + Mutation mut = new Mutation("r" + i); + if (i < numFiles / 2) { + mut.at().family("f").qualifier("q").put(largeData); + } else { + mut.at().family("f").qualifier("q").put(smallData); + } + writer.addMutation(mut); + writer.flush(); + client.tableOperations().flush(tableName, null, null, true); + } + } + + client.tableOperations().compact(tableName, + new CompactionConfig().setWait(true) + .setConfigurer(new PluginConfig(CompressionTypeConfigurer.class.getName(), + Map.of(CompressionTypeConfigurer.FINAL_COMPRESS_TYPE_KEY, "snappy", + CompressionTypeConfigurer.INTERMEDIATE_COMPRESS_TYPE_KEY, "gz")))); + + var tableId = TableId.of(client.tableOperations().tableIdMap().get(tableName)); + // The directory of the RFiles + java.nio.file.Path rootPath = null; + // The path to the final compaction RFile (located within rootPath) + java.nio.file.Path finalCompactionFilePath = null; + int count = 0; + try (var tabletsMeta = + TabletsMetadata.builder(client).forTable(tableId).fetch(ColumnType.FILES).build()) { + for (TabletMetadata tm : tabletsMeta) { + for (StoredTabletFile stf : tm.getFiles()) { + // Since the 8 files should be compacted down to 1 file, these should only be set once + finalCompactionFilePath = Paths.get(stf.getPath().toUri().getRawPath()); + rootPath = Paths.get(stf.getPath().getParent().toUri().getRawPath()); + count++; + } + } + } + assertEquals(1, count); + assertNotNull(finalCompactionFilePath); + assertNotNull(rootPath); + String finalCompactionFile = finalCompactionFilePath.toString(); + // The following will find the intermediate compaction file in the root path. + // Intermediate compaction files begin with 'C' and end with '.rf' + final String[] interCompactionFile = {null}; + Files.walkFileTree(rootPath, new SimpleFileVisitor<java.nio.file.Path>() { + @Override + public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs) + throws IOException { + String regex = "^C.*\\.rf$"; + java.nio.file.Path fileName = (file != null) ? file.getFileName() : null; + if (fileName != null && fileName.toString().matches(regex)) { + interCompactionFile[0] = file.toString(); + return FileVisitResult.TERMINATE; + } + return FileVisitResult.CONTINUE; + } + }); + assertNotNull(interCompactionFile[0]); + String[] args = new String[3]; + args[0] = "--props"; + args[1] = getCluster().getAccumuloPropertiesPath(); + args[2] = finalCompactionFile; + PrintBCInfo bcInfo = new PrintBCInfo(args); + String finalCompressionType = bcInfo.getCompressionType(); + // The compression type used on the final compaction file should be 'snappy' + assertEquals("snappy", finalCompressionType); + args[2] = interCompactionFile[0]; + bcInfo = new PrintBCInfo(args); + String interCompressionType = bcInfo.getCompressionType(); + // The compression type used on the intermediate compaction file should be 'gz' + assertEquals("gz", interCompressionType); + } finally { + // Re-enable GC + getCluster().getClusterControl().startAllServers(ServerType.GARBAGE_COLLECTOR); + } + } + + /** + * Was used in debugging {@link #testGetSelectedFilesForCompaction}. May be useful later. + * + * @param client An accumulo client + * @param tableName The name of the table + * @return a map of the RFiles to their size in bytes + */ + private Map<String,Long> getFileSizeMap(AccumuloClient client, String tableName) { + var tableId = TableId.of(client.tableOperations().tableIdMap().get(tableName)); + Map<String,Long> map = new HashMap<>(); + + try (var tabletsMeta = + TabletsMetadata.builder(client).forTable(tableId).fetch(ColumnType.FILES).build()) { + for (TabletMetadata tm : tabletsMeta) { + for (StoredTabletFile stf : tm.getFiles()) { + try { + String filePath = stf.getPath().toString(); + Long fileSize = + FileSystem.getLocal(new Configuration()).getFileStatus(stf.getPath()).getLen(); + map.put(filePath, fileSize); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + + return map; + } + } + + @Test + public void testDeleteCompactionService() throws Exception { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + var uniqueNames = getUniqueNames(2); + String table1 = uniqueNames[0]; + String table2 = uniqueNames[1]; + + // create a compaction service named deleteme + c.instanceOperations().setProperty( - Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "deleteme.planner", ++ Property.COMPACTION_SERVICE_PREFIX.getKey() + "deleteme.planner", + DefaultCompactionPlanner.class.getName()); + c.instanceOperations().setProperty( - Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "deleteme.planner.opts.executors", ++ Property.COMPACTION_SERVICE_PREFIX.getKey() + "deleteme.planner.opts.executors", + "[{'name':'all','type':'internal','numThreads':1}]".replaceAll("'", "\"")); + + // create a compaction service named keepme + c.instanceOperations().setProperty( - Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "keepme.planner", ++ Property.COMPACTION_SERVICE_PREFIX.getKey() + "keepme.planner", + DefaultCompactionPlanner.class.getName()); + c.instanceOperations().setProperty( - Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "keepme.planner.opts.executors", ++ Property.COMPACTION_SERVICE_PREFIX.getKey() + "keepme.planner.opts.executors", + "[{'name':'all','type':'internal','numThreads':1}]".replaceAll("'", "\"")); + + // create a table that uses the compaction service deleteme + Map<String,String> props = new HashMap<>(); + props.put(Property.TABLE_COMPACTION_DISPATCHER.getKey(), + SimpleCompactionDispatcher.class.getName()); + props.put(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + "service", "deleteme"); + c.tableOperations().create(table1, new NewTableConfiguration().setProperties(props)); + + // create a table that uses the compaction service keepme + props.clear(); + props.put(Property.TABLE_COMPACTION_DISPATCHER.getKey(), + SimpleCompactionDispatcher.class.getName()); + props.put(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + "service", "keepme"); + c.tableOperations().create(table2, new NewTableConfiguration().setProperties(props)); + + try (var writer1 = c.createBatchWriter(table1); var writer2 = c.createBatchWriter(table2)) { + for (int i = 0; i < 10; i++) { + Mutation m = new Mutation("" + i); + m.put("f", "q", "" + i); + writer1.addMutation(m); + writer2.addMutation(m); + } + } + + c.tableOperations().compact(table1, new CompactionConfig().setWait(true)); + c.tableOperations().compact(table2, new CompactionConfig().setWait(true)); + + // delete the compaction service deleteme + c.instanceOperations() - .removeProperty(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "deleteme.planner"); ++ .removeProperty(Property.COMPACTION_SERVICE_PREFIX.getKey() + "deleteme.planner"); + c.instanceOperations().removeProperty( - Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "deleteme.planner.opts.executors"); ++ Property.COMPACTION_SERVICE_PREFIX.getKey() + "deleteme.planner.opts.executors"); + + // add a new compaction service named newcs + c.instanceOperations().setProperty( - Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "newcs.planner", ++ Property.COMPACTION_SERVICE_PREFIX.getKey() + "newcs.planner", + DefaultCompactionPlanner.class.getName()); + c.instanceOperations().setProperty( - Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "newcs.planner.opts.executors", ++ Property.COMPACTION_SERVICE_PREFIX.getKey() + "newcs.planner.opts.executors", + "[{'name':'all','type':'internal','numThreads':1}]".replaceAll("'", "\"")); + + // set table 1 to a compaction service newcs + c.tableOperations().setProperty(table1, + Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + "service", "newcs"); + + // ensure tables can still compact and are not impacted by the deleted compaction service + for (int i = 0; i < 10; i++) { + c.tableOperations().compact(table1, new CompactionConfig().setWait(true)); + c.tableOperations().compact(table2, new CompactionConfig().setWait(true)); + + try (var scanner = c.createScanner(table1)) { + assertEquals(9 * 10 / 2, scanner.stream().map(Entry::getValue) + .mapToInt(v -> Integer.parseInt(v.toString())).sum()); + } + try (var scanner = c.createScanner(table2)) { + assertEquals(9 * 10 / 2, scanner.stream().map(Entry::getValue) + .mapToInt(v -> Integer.parseInt(v.toString())).sum()); + } + + Thread.sleep(100); + } + } + } + private int countFiles(AccumuloClient c) throws Exception { - try (Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) { + try (Scanner s = c.createScanner(AccumuloTable.METADATA.tableName(), Authorizations.EMPTY)) { s.fetchColumnFamily(new Text(TabletColumnFamily.NAME)); s.fetchColumnFamily(new Text(DataFileColumnFamily.NAME)); return Iterators.size(s.iterator());