This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 5fb05efd26d0c15f633dbd4e0d9ce8daa16282a5
Merge: 1db892b00d 8bf0970c7f
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Thu Feb 1 17:16:50 2024 -0500

    Merge branch '2.1'

 .../tserver/compactions/CompactionManager.java     |  8 +-
 .../tserver/compactions/CompactionService.java     |  1 +
 .../accumulo/test/functional/CompactionIT.java     | 88 ++++++++++++++++++++++
 3 files changed, 93 insertions(+), 4 deletions(-)

diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
index 7b633e338d,93c8c1713e..6df72f17d9
--- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
@@@ -90,8 -77,9 +90,10 @@@ import org.apache.accumulo.core.metadat
  import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
  import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
  import org.apache.accumulo.core.security.Authorizations;
+ import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner;
+ import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher;
  import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import org.apache.accumulo.minicluster.ServerType;
  import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
  import org.apache.accumulo.test.VerifyIngest;
  import org.apache.accumulo.test.VerifyIngest.VerifyParams;
@@@ -705,200 -628,94 +707,286 @@@ public class CompactionIT extends Accum
      }
    }
  
 +  @Test
 +  public void testSelectNoFiles() throws Exception {
 +
 +    // Test a compaction selector that selects no files. In this case there 
is no work to,
 +    // so we want to ensure it does not hang
 +
 +    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
 +      String tableName = getUniqueNames(1)[0];
 +      c.tableOperations().create(tableName);
 +
 +      writeFlush(c, tableName, "a");
 +      writeFlush(c, tableName, "b");
 +
 +      CompactionConfig config = new CompactionConfig()
 +          .setSelector(new 
PluginConfig(EmptyCompactionSelector.class.getName(), Map.of()))
 +          .setWait(true);
 +      c.tableOperations().compact(tableName, config);
 +
 +      assertEquals(Set.of("a", "b"), getRows(c, tableName));
 +    }
 +
 +  }
 +
 +  @Test
 +  public void testConcurrent() throws Exception {
 +    // two compactions without iterators or strategy should be able to run 
concurrently
 +
 +    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
 +
 +      String tableName = getUniqueNames(1)[0];
 +      c.tableOperations().create(tableName);
 +
 +      // write random data because its very unlikely it will compress
 +      writeRandomValue(c, tableName, 1 << 16);
 +      writeRandomValue(c, tableName, 1 << 16);
 +
 +      c.tableOperations().compact(tableName, new 
CompactionConfig().setWait(false));
 +      c.tableOperations().compact(tableName, new 
CompactionConfig().setWait(true));
 +
 +      assertEquals(1, FunctionalTestUtils.countRFiles(c, tableName));
 +
 +      writeRandomValue(c, tableName, 1 << 16);
 +
 +      IteratorSetting iterConfig = new IteratorSetting(30, 
SlowIterator.class);
 +      SlowIterator.setSleepTime(iterConfig, 1000);
 +
 +      long t1 = System.currentTimeMillis();
 +      c.tableOperations().compact(tableName,
 +          new 
CompactionConfig().setWait(false).setIterators(java.util.Arrays.asList(iterConfig)));
 +      try {
 +        // this compaction should fail because previous one set iterators
 +        c.tableOperations().compact(tableName, new 
CompactionConfig().setWait(true));
 +        if (System.currentTimeMillis() - t1 < 2000) {
 +          fail("Expected compaction to fail because another concurrent 
compaction set iterators");
 +        }
 +      } catch (AccumuloException e) {}
 +    }
 +  }
 +
 +  @Test
 +  public void testGetSelectedFilesForCompaction() throws Exception {
 +
 +    // Tests CompactionConfigurer.InputParameters.getSelectedFiles()
 +
 +    String tableName = this.getUniqueNames(1)[0];
 +    // Disable GC so intermediate compaction files are not deleted
 +    
getCluster().getClusterControl().stopAllServers(ServerType.GARBAGE_COLLECTOR);
 +
 +    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
 +      Map<String,String> props = new HashMap<>();
 +      props.put(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), "none");
 +      // This is done to avoid system compactions - we want to do all the 
compactions ourselves
 +      props.put("table.compaction.dispatcher.opts.service.system", 
"nonexitant");
 +      NewTableConfiguration ntc = new 
NewTableConfiguration().setProperties(props);
 +      client.tableOperations().create(tableName, ntc);
 +
 +      // The following will create 4 small and 4 large RFiles
 +      // The 4 small files will be compacted into one file (an "intermediate 
compaction" file)
 +      // Then, this file will be compacted with the 4 large files, creating 
the final compaction
 +      // file
 +      byte[] largeData = new byte[1_000_000];
 +      byte[] smallData = new byte[100_000];
 +      final int numFiles = 8;
 +      Arrays.fill(largeData, (byte) 65);
 +      Arrays.fill(smallData, (byte) 65);
 +      try (var writer = client.createBatchWriter(tableName)) {
 +        for (int i = 0; i < numFiles; i++) {
 +          Mutation mut = new Mutation("r" + i);
 +          if (i < numFiles / 2) {
 +            mut.at().family("f").qualifier("q").put(largeData);
 +          } else {
 +            mut.at().family("f").qualifier("q").put(smallData);
 +          }
 +          writer.addMutation(mut);
 +          writer.flush();
 +          client.tableOperations().flush(tableName, null, null, true);
 +        }
 +      }
 +
 +      client.tableOperations().compact(tableName,
 +          new CompactionConfig().setWait(true)
 +              .setConfigurer(new 
PluginConfig(CompressionTypeConfigurer.class.getName(),
 +                  Map.of(CompressionTypeConfigurer.FINAL_COMPRESS_TYPE_KEY, 
"snappy",
 +                      
CompressionTypeConfigurer.INTERMEDIATE_COMPRESS_TYPE_KEY, "gz"))));
 +
 +      var tableId = 
TableId.of(client.tableOperations().tableIdMap().get(tableName));
 +      // The directory of the RFiles
 +      java.nio.file.Path rootPath = null;
 +      // The path to the final compaction RFile (located within rootPath)
 +      java.nio.file.Path finalCompactionFilePath = null;
 +      int count = 0;
 +      try (var tabletsMeta =
 +          
TabletsMetadata.builder(client).forTable(tableId).fetch(ColumnType.FILES).build())
 {
 +        for (TabletMetadata tm : tabletsMeta) {
 +          for (StoredTabletFile stf : tm.getFiles()) {
 +            // Since the 8 files should be compacted down to 1 file, these 
should only be set once
 +            finalCompactionFilePath = 
Paths.get(stf.getPath().toUri().getRawPath());
 +            rootPath = 
Paths.get(stf.getPath().getParent().toUri().getRawPath());
 +            count++;
 +          }
 +        }
 +      }
 +      assertEquals(1, count);
 +      assertNotNull(finalCompactionFilePath);
 +      assertNotNull(rootPath);
 +      String finalCompactionFile = finalCompactionFilePath.toString();
 +      // The following will find the intermediate compaction file in the root 
path.
 +      // Intermediate compaction files begin with 'C' and end with '.rf'
 +      final String[] interCompactionFile = {null};
 +      Files.walkFileTree(rootPath, new 
SimpleFileVisitor<java.nio.file.Path>() {
 +        @Override
 +        public FileVisitResult visitFile(java.nio.file.Path file, 
BasicFileAttributes attrs)
 +            throws IOException {
 +          String regex = "^C.*\\.rf$";
 +          java.nio.file.Path fileName = (file != null) ? file.getFileName() : 
null;
 +          if (fileName != null && fileName.toString().matches(regex)) {
 +            interCompactionFile[0] = file.toString();
 +            return FileVisitResult.TERMINATE;
 +          }
 +          return FileVisitResult.CONTINUE;
 +        }
 +      });
 +      assertNotNull(interCompactionFile[0]);
 +      String[] args = new String[3];
 +      args[0] = "--props";
 +      args[1] = getCluster().getAccumuloPropertiesPath();
 +      args[2] = finalCompactionFile;
 +      PrintBCInfo bcInfo = new PrintBCInfo(args);
 +      String finalCompressionType = bcInfo.getCompressionType();
 +      // The compression type used on the final compaction file should be 
'snappy'
 +      assertEquals("snappy", finalCompressionType);
 +      args[2] = interCompactionFile[0];
 +      bcInfo = new PrintBCInfo(args);
 +      String interCompressionType = bcInfo.getCompressionType();
 +      // The compression type used on the intermediate compaction file should 
be 'gz'
 +      assertEquals("gz", interCompressionType);
 +    } finally {
 +      // Re-enable GC
 +      
getCluster().getClusterControl().startAllServers(ServerType.GARBAGE_COLLECTOR);
 +    }
 +  }
 +
 +  /**
 +   * Was used in debugging {@link #testGetSelectedFilesForCompaction}. May be 
useful later.
 +   *
 +   * @param client An accumulo client
 +   * @param tableName The name of the table
 +   * @return a map of the RFiles to their size in bytes
 +   */
 +  private Map<String,Long> getFileSizeMap(AccumuloClient client, String 
tableName) {
 +    var tableId = 
TableId.of(client.tableOperations().tableIdMap().get(tableName));
 +    Map<String,Long> map = new HashMap<>();
 +
 +    try (var tabletsMeta =
 +        
TabletsMetadata.builder(client).forTable(tableId).fetch(ColumnType.FILES).build())
 {
 +      for (TabletMetadata tm : tabletsMeta) {
 +        for (StoredTabletFile stf : tm.getFiles()) {
 +          try {
 +            String filePath = stf.getPath().toString();
 +            Long fileSize =
 +                FileSystem.getLocal(new 
Configuration()).getFileStatus(stf.getPath()).getLen();
 +            map.put(filePath, fileSize);
 +          } catch (IOException e) {
 +            throw new UncheckedIOException(e);
 +          }
 +        }
 +      }
 +
 +      return map;
 +    }
 +  }
 +
+   @Test
+   public void testDeleteCompactionService() throws Exception {
+     try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+       var uniqueNames = getUniqueNames(2);
+       String table1 = uniqueNames[0];
+       String table2 = uniqueNames[1];
+ 
+       // create a compaction service named deleteme
+       c.instanceOperations().setProperty(
 -          Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + 
"deleteme.planner",
++          Property.COMPACTION_SERVICE_PREFIX.getKey() + "deleteme.planner",
+           DefaultCompactionPlanner.class.getName());
+       c.instanceOperations().setProperty(
 -          Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + 
"deleteme.planner.opts.executors",
++          Property.COMPACTION_SERVICE_PREFIX.getKey() + 
"deleteme.planner.opts.executors",
+           "[{'name':'all','type':'internal','numThreads':1}]".replaceAll("'", 
"\""));
+ 
+       // create a compaction service named keepme
+       c.instanceOperations().setProperty(
 -          Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + 
"keepme.planner",
++          Property.COMPACTION_SERVICE_PREFIX.getKey() + "keepme.planner",
+           DefaultCompactionPlanner.class.getName());
+       c.instanceOperations().setProperty(
 -          Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + 
"keepme.planner.opts.executors",
++          Property.COMPACTION_SERVICE_PREFIX.getKey() + 
"keepme.planner.opts.executors",
+           "[{'name':'all','type':'internal','numThreads':1}]".replaceAll("'", 
"\""));
+ 
+       // create a table that uses the compaction service deleteme
+       Map<String,String> props = new HashMap<>();
+       props.put(Property.TABLE_COMPACTION_DISPATCHER.getKey(),
+           SimpleCompactionDispatcher.class.getName());
+       props.put(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + 
"service", "deleteme");
+       c.tableOperations().create(table1, new 
NewTableConfiguration().setProperties(props));
+ 
+       // create a table that uses the compaction service keepme
+       props.clear();
+       props.put(Property.TABLE_COMPACTION_DISPATCHER.getKey(),
+           SimpleCompactionDispatcher.class.getName());
+       props.put(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + 
"service", "keepme");
+       c.tableOperations().create(table2, new 
NewTableConfiguration().setProperties(props));
+ 
+       try (var writer1 = c.createBatchWriter(table1); var writer2 = 
c.createBatchWriter(table2)) {
+         for (int i = 0; i < 10; i++) {
+           Mutation m = new Mutation("" + i);
+           m.put("f", "q", "" + i);
+           writer1.addMutation(m);
+           writer2.addMutation(m);
+         }
+       }
+ 
+       c.tableOperations().compact(table1, new 
CompactionConfig().setWait(true));
+       c.tableOperations().compact(table2, new 
CompactionConfig().setWait(true));
+ 
+       // delete the compaction service deleteme
+       c.instanceOperations()
 -          .removeProperty(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + 
"deleteme.planner");
++          .removeProperty(Property.COMPACTION_SERVICE_PREFIX.getKey() + 
"deleteme.planner");
+       c.instanceOperations().removeProperty(
 -          Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + 
"deleteme.planner.opts.executors");
++          Property.COMPACTION_SERVICE_PREFIX.getKey() + 
"deleteme.planner.opts.executors");
+ 
+       // add a new compaction service named newcs
+       c.instanceOperations().setProperty(
 -          Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "newcs.planner",
++          Property.COMPACTION_SERVICE_PREFIX.getKey() + "newcs.planner",
+           DefaultCompactionPlanner.class.getName());
+       c.instanceOperations().setProperty(
 -          Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + 
"newcs.planner.opts.executors",
++          Property.COMPACTION_SERVICE_PREFIX.getKey() + 
"newcs.planner.opts.executors",
+           "[{'name':'all','type':'internal','numThreads':1}]".replaceAll("'", 
"\""));
+ 
+       // set table 1 to a compaction service newcs
+       c.tableOperations().setProperty(table1,
+           Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + "service", 
"newcs");
+ 
+       // ensure tables can still compact and are not impacted by the deleted 
compaction service
+       for (int i = 0; i < 10; i++) {
+         c.tableOperations().compact(table1, new 
CompactionConfig().setWait(true));
+         c.tableOperations().compact(table2, new 
CompactionConfig().setWait(true));
+ 
+         try (var scanner = c.createScanner(table1)) {
+           assertEquals(9 * 10 / 2, scanner.stream().map(Entry::getValue)
+               .mapToInt(v -> Integer.parseInt(v.toString())).sum());
+         }
+         try (var scanner = c.createScanner(table2)) {
+           assertEquals(9 * 10 / 2, scanner.stream().map(Entry::getValue)
+               .mapToInt(v -> Integer.parseInt(v.toString())).sum());
+         }
+ 
+         Thread.sleep(100);
+       }
+     }
+   }
+ 
    private int countFiles(AccumuloClient c) throws Exception {
 -    try (Scanner s = c.createScanner(MetadataTable.NAME, 
Authorizations.EMPTY)) {
 +    try (Scanner s = c.createScanner(AccumuloTable.METADATA.tableName(), 
Authorizations.EMPTY)) {
        s.fetchColumnFamily(new Text(TabletColumnFamily.NAME));
        s.fetchColumnFamily(new Text(DataFileColumnFamily.NAME));
        return Iterators.size(s.iterator());

Reply via email to