This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 03c06024a6 Backport Bulk Import test to 2.1 (#6113)
03c06024a6 is described below
commit 03c06024a6f5ad6c581bd390693a65b13b8cb31b
Author: Arbaaz Khan <[email protected]>
AuthorDate: Mon Feb 9 14:36:08 2026 -0500
Backport Bulk Import test to 2.1 (#6113)
* backport BulkNewIT.testManyTabletAndFiles() to 2.1
---
.../apache/accumulo/test/functional/BulkNewIT.java | 64 ++++++++++++++++++++++
1 file changed, 64 insertions(+)
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
index eb52ec2c81..458ec594c8 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
@@ -39,6 +39,7 @@ import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -64,6 +65,7 @@ import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.TimeType;
@@ -73,6 +75,7 @@ import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.LoadPlan;
import org.apache.accumulo.core.data.LoadPlan.RangeType;
import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.data.constraints.Constraint;
@@ -715,6 +718,67 @@ public class BulkNewIT extends SharedMiniClusterBase {
}
}
+ @Test
+ public void testManyTabletAndFiles() throws Exception {
+ try (AccumuloClient c =
Accumulo.newClient().from(getClientProps()).build()) {
+ String dir = getDir("/testBulkFile-");
+ FileSystem fs = getCluster().getFileSystem();
+ fs.mkdirs(new Path(dir));
+
+ TreeSet<Text> splits = IntStream.range(1,
9000).mapToObj(BulkNewIT::row).map(Text::new)
+ .collect(Collectors.toCollection(TreeSet::new));
+ c.tableOperations().addSplits(tableName, splits);
+
+ final int numTasks = 16;
+ var executor = Executors.newFixedThreadPool(numTasks);
+ var futures = new ArrayList<Future<?>>();
+ // wait for a portion of the tasks to be ready
+ CountDownLatch startLatch = new CountDownLatch(numTasks);
+ assertTrue(numTasks >= startLatch.getCount(),
+ "Not enough tasks/threads to satisfy latch count - deadlock risk");
+
+ var loadPlanBuilder = LoadPlan.builder();
+ var rowsExpected = new HashSet<>();
+ var imports = IntStream.range(2,
8999).boxed().collect(Collectors.toList());
+ // The order in which imports are added to the load plan should not
matter so test that.
+ Collections.shuffle(imports);
+ for (var data : imports) {
+ String filename = "f" + data + ".";
+ loadPlanBuilder.loadFileTo(filename + RFile.EXTENSION,
RangeType.TABLE, row(data - 1),
+ row(data));
+ var future = executor.submit(() -> {
+ startLatch.countDown();
+ startLatch.await();
+ writeData(fs, dir + "/" + filename, aconf, data, data);
+ return null;
+ });
+ futures.add(future);
+ rowsExpected.add(row(data));
+ }
+ assertEquals(imports.size(), futures.size());
+
+ for (var future : futures) {
+ future.get();
+ }
+
+ executor.shutdown();
+
+ var loadPlan = loadPlanBuilder.build();
+
+
c.tableOperations().importDirectory(dir).to(tableName).plan(loadPlan).load();
+
+ // using a batch scanner can read from lots of tablets w/ less RPCs
+ try (var scanner = c.createBatchScanner(tableName)) {
+ // use a scan server so that tablets do not need to be hosted
+ scanner.setConsistencyLevel(ScannerBase.ConsistencyLevel.EVENTUAL);
+ scanner.setRanges(List.of(new Range()));
+ var rowsSeen = scanner.stream().map(e ->
e.getKey().getRowData().toString())
+ .collect(Collectors.toSet());
+ assertEquals(rowsExpected, rowsSeen);
+ }
+ }
+ }
+
@Test
public void testManyTablets() throws Exception {