This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 03c06024a6 Backport Bulk Import test to 2.1 (#6113)
03c06024a6 is described below

commit 03c06024a6f5ad6c581bd390693a65b13b8cb31b
Author: Arbaaz Khan <[email protected]>
AuthorDate: Mon Feb 9 14:36:08 2026 -0500

    Backport Bulk Import test to 2.1 (#6113)
    
    * backport BulkNewIT.testManyTabletAndFiles() to 2.1
---
 .../apache/accumulo/test/functional/BulkNewIT.java | 64 ++++++++++++++++++++++
 1 file changed, 64 insertions(+)

diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
index eb52ec2c81..458ec594c8 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
@@ -39,6 +39,7 @@ import java.security.NoSuchAlgorithmException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -64,6 +65,7 @@ import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.core.client.admin.TimeType;
@@ -73,6 +75,7 @@ import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.LoadPlan;
 import org.apache.accumulo.core.data.LoadPlan.RangeType;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.data.constraints.Constraint;
@@ -715,6 +718,67 @@ public class BulkNewIT extends SharedMiniClusterBase {
     }
   }
 
+  @Test
+  public void testManyTabletAndFiles() throws Exception {
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+      String dir = getDir("/testBulkFile-");
+      FileSystem fs = getCluster().getFileSystem();
+      fs.mkdirs(new Path(dir));
+
+      TreeSet<Text> splits = IntStream.range(1, 
9000).mapToObj(BulkNewIT::row).map(Text::new)
+          .collect(Collectors.toCollection(TreeSet::new));
+      c.tableOperations().addSplits(tableName, splits);
+
+      final int numTasks = 16;
+      var executor = Executors.newFixedThreadPool(numTasks);
+      var futures = new ArrayList<Future<?>>();
+      // wait for a portion of the tasks to be ready
+      CountDownLatch startLatch = new CountDownLatch(numTasks);
+      assertTrue(numTasks >= startLatch.getCount(),
+          "Not enough tasks/threads to satisfy latch count - deadlock risk");
+
+      var loadPlanBuilder = LoadPlan.builder();
+      var rowsExpected = new HashSet<>();
+      var imports = IntStream.range(2, 
8999).boxed().collect(Collectors.toList());
+      // The order in which imports are added to the load plan should not 
matter so test that.
+      Collections.shuffle(imports);
+      for (var data : imports) {
+        String filename = "f" + data + ".";
+        loadPlanBuilder.loadFileTo(filename + RFile.EXTENSION, 
RangeType.TABLE, row(data - 1),
+            row(data));
+        var future = executor.submit(() -> {
+          startLatch.countDown();
+          startLatch.await();
+          writeData(fs, dir + "/" + filename, aconf, data, data);
+          return null;
+        });
+        futures.add(future);
+        rowsExpected.add(row(data));
+      }
+      assertEquals(imports.size(), futures.size());
+
+      for (var future : futures) {
+        future.get();
+      }
+
+      executor.shutdown();
+
+      var loadPlan = loadPlanBuilder.build();
+
+      
c.tableOperations().importDirectory(dir).to(tableName).plan(loadPlan).load();
+
+      // using a batch scanner can read from lots of tablets w/ less RPCs
+      try (var scanner = c.createBatchScanner(tableName)) {
+        // use a scan server so that tablets do not need to be hosted
+        scanner.setConsistencyLevel(ScannerBase.ConsistencyLevel.EVENTUAL);
+        scanner.setRanges(List.of(new Range()));
+        var rowsSeen = scanner.stream().map(e -> 
e.getKey().getRowData().toString())
+            .collect(Collectors.toSet());
+        assertEquals(rowsExpected, rowsSeen);
+      }
+    }
+  }
+
   @Test
   public void testManyTablets() throws Exception {
 

Reply via email to