This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new c5877f846e adds function to compute load plans when splits are unknown 
(#5982)
c5877f846e is described below

commit c5877f846ecee8da422179b3572d4e2295559196
Author: Keith Turner <[email protected]>
AuthorDate: Tue Mar 3 11:38:13 2026 -0800

    adds function to compute load plans when splits are unknown (#5982)
    
    fixes #5971
    
    
    Co-authored-by: Dave Marion <[email protected]>
---
 .../org/apache/accumulo/core/data/LoadPlan.java    | 60 ++++++++++++++++++++--
 .../core/client/rfile/RFileClientTest.java         | 41 +++++++++++++++
 .../apache/accumulo/core/crypto/CryptoTest.java    | 21 ++++++++
 3 files changed, 118 insertions(+), 4 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java 
b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java
index 2d1fd04e45..a874ae952e 100644
--- a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java
+++ b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java
@@ -38,7 +38,14 @@ import java.util.stream.Collectors;
 import 
org.apache.accumulo.core.client.admin.TableOperations.ImportMappingOptions;
 import org.apache.accumulo.core.client.rfile.RFile;
 import org.apache.accumulo.core.clientImpl.bulk.BulkImport;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoFactoryLoader;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
+import org.apache.accumulo.core.spi.crypto.CryptoService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 
@@ -90,13 +97,19 @@ public class LoadPlan {
      * row and end row can be null. The start row is exclusive and the end row 
is inclusive (like
      * Accumulo tablets). A common use case for this would be when files were 
partitioned using a
      * table's splits. When using this range type, the start and end row must 
exist as splits in the
-     * table or an exception will be thrown at load time.
+     * table or an exception will be thrown at load time. This RangeType is 
the most efficient for
+     * accumulo to load, and it enables only loading files to tablets that 
overlap data in the file.
      */
     TABLE,
     /**
-     * Range that correspond to known rows in a file. For this range type, the 
start row and end row
-     * must be non-null. The start row and end row are both considered 
inclusive. At load time,
-     * these data ranges will be mapped to table ranges.
+     * Range that corresponds to the minimum and maximum rows in a file. For 
this range type, the
+     * start row and end row must be non-null. The start row and end row are 
both considered
+     * inclusive. At load time, these data ranges will be mapped to table 
ranges. For this RangeType
+     * Accumulo has to do more work at load to map the file range to tablets. 
Also, this will map a
+     * file to all tablets in the range even if the file has no data for that 
tablet. For example if
+     * a range overlapped 10 tablets but the file only had data for 8 of those 
tablets, the file
+     * would still be loaded to all 10. This will not cause problems for scans 
or compactions other
+     * than the unnecessary work of opening a file and finding it has no data 
for the tablet.
      */
     FILE
   }
@@ -459,6 +472,7 @@ public class LoadPlan {
    * Computes a load plan for a given rfile. This will open the rfile and find 
every
    * {@link TableSplits} that overlaps rows in the file and add those to the 
returned load plan.
    *
+   * @return a load plan of type {@link RangeType#TABLE}
    * @since 2.1.4
    */
   public static LoadPlan compute(URI file, SplitResolver splitResolver) throws 
IOException {
@@ -475,6 +489,7 @@ public class LoadPlan {
    *
    * @param properties used when opening the rfile, see
    *        {@link 
org.apache.accumulo.core.client.rfile.RFile.ScannerOptions#withTableProperties(Map)}
+   * @return a load plan of type {@link RangeType#TABLE}
    * @since 2.1.4
    */
   public static LoadPlan compute(URI file, Map<String,String> properties,
@@ -510,4 +525,41 @@ public class LoadPlan {
       return builder.build();
     }
   }
+
+  /**
+   * Computes a load plan for a rfile based on the minimum and maximum row 
present across all
+   * locality groups.
+   *
+   * @param properties used when opening the rfile, see
+   *        {@link 
org.apache.accumulo.core.client.rfile.RFile.ScannerOptions#withTableProperties(Map)}
+   *
+   * @return a load plan of type {@link RangeType#FILE}
+   * @since 2.1.5
+   */
+  public static LoadPlan compute(URI file, Map<String,String> properties) 
throws IOException {
+    var path = new Path(file);
+    var conf = new Configuration();
+    var fs = FileSystem.get(path.toUri(), conf);
+    CryptoService cs =
+        CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE, 
properties);
+    var tableConf = 
SiteConfiguration.empty().withOverrides(properties).build();
+    try (var reader = FileOperations.getInstance().newReaderBuilder()
+        .forFile(file.toString(), fs, conf, 
cs).withTableConfiguration(tableConf).build();) {
+      var firstRow = reader.getFirstKey().getRow();
+      var lastRow = reader.getLastKey().getRow();
+      return LoadPlan.builder().loadFileTo(path.getName(), RangeType.FILE, 
firstRow, lastRow)
+          .build();
+    }
+  }
+
+  /**
+   * Computes a load plan for a rfile based on the minimum and maximum row 
present across all
+   * locality groups.
+   *
+   * @return a load plan of type {@link RangeType#FILE}
+   * @since 2.1.5
+   */
+  public static LoadPlan compute(URI file) throws IOException {
+    return compute(file, Map.of());
+  }
 }
diff --git 
a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java 
b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java
index 58e56abf0a..ae3dccaecb 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java
@@ -948,6 +948,47 @@ public class RFileClientTest {
     var expectedLoadPlan =
         LoadPlan.builder().loadFileTo(filename, LoadPlan.RangeType.FILE, 
"001", "009").build();
     assertEquals(expectedLoadPlan.toJson(), loadPlan.toJson());
+    assertEquals(expectedLoadPlan.toJson(), LoadPlan.compute(new 
URI(testFile)).toJson());
+
+    // put the first row in the default LG and last row in the first LG
+    testFile = createTmpTestFile();
+    var writer2 = 
RFile.newWriter().to(testFile).withFileSystem(localFs).build();
+    try (writer2) {
+      writer2.startNewLocalityGroup("LG1", "F1");
+      writer2.append(new Key("007", "F1"), "V1");
+      writer2.append(new Key("009", "F1"), "V2");
+      writer2.startNewLocalityGroup("LG2", "F3");
+      writer2.append(new Key("003", "F3"), "V3");
+      writer2.append(new Key("004", "F3"), "V4");
+      writer2.startDefaultLocalityGroup();
+      writer2.append(new Key("002", "F4"), "V5");
+      writer2.append(new Key("008", "F4"), "V6");
+    }
+
+    filename = new Path(testFile).getName();
+    loadPlan = writer2.getLoadPlan(filename);
+    assertEquals(1, loadPlan.getDestinations().size());
+    expectedLoadPlan =
+        LoadPlan.builder().loadFileTo(filename, LoadPlan.RangeType.FILE, 
"002", "009").build();
+    assertEquals(expectedLoadPlan.toJson(), loadPlan.toJson());
+    assertEquals(expectedLoadPlan.toJson(), LoadPlan.compute(new 
URI(testFile)).toJson());
+
+    // create a file w/ a single LG
+    testFile = createTmpTestFile();
+    var writer3 = 
RFile.newWriter().to(testFile).withFileSystem(localFs).build();
+    try (writer3) {
+      writer3.startDefaultLocalityGroup();
+      writer3.append(new Key("003", "F4"), "V5");
+      writer3.append(new Key("008", "F4"), "V6");
+    }
+
+    filename = new Path(testFile).getName();
+    loadPlan = writer3.getLoadPlan(filename);
+    assertEquals(1, loadPlan.getDestinations().size());
+    expectedLoadPlan =
+        LoadPlan.builder().loadFileTo(filename, LoadPlan.RangeType.FILE, 
"003", "008").build();
+    assertEquals(expectedLoadPlan.toJson(), loadPlan.toJson());
+    assertEquals(expectedLoadPlan.toJson(), LoadPlan.compute(new 
URI(testFile)).toJson());
   }
 
   @Test
diff --git a/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java 
b/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java
index 08ef7f660f..a27e99a227 100644
--- a/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java
@@ -48,8 +48,11 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeSet;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import javax.crypto.Cipher;
 import javax.crypto.NoSuchPaddingException;
@@ -67,6 +70,7 @@ import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.crypto.streams.NoFlushOutputStream;
 import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.LoadPlan;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.spi.crypto.AESCryptoService;
@@ -85,6 +89,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
@@ -357,6 +362,22 @@ public class CryptoTest {
     assertEquals(1, summary.getStatistics().size());
     assertEquals(0, summary.getFileStatistics().getInaccurate());
     assertEquals(1, summary.getFileStatistics().getTotal());
+
+    // test computing load plan for encrypted files
+    var absUri = new Path(file).makeQualified(fs.getUri(), 
fs.getWorkingDirectory()).toUri();
+    var loadPlan = LoadPlan.compute(absUri, 
cryptoOnConf.getAllCryptoProperties());
+    var expectedLoadPlan =
+        LoadPlan.builder().loadFileTo("testFile1.rf", LoadPlan.RangeType.FILE, 
"a", "a3").build();
+    assertEquals(expectedLoadPlan.toJson(), loadPlan.toJson());
+
+    var splits =
+        Stream.of("a", "b", 
"c").map(Text::new).collect(Collectors.toCollection(TreeSet::new));
+    var resolver = LoadPlan.SplitResolver.from(splits);
+    var loadPlan2 = LoadPlan.compute(absUri, 
cryptoOnConf.getAllCryptoProperties(), resolver);
+    var expectedLoadPlan2 =
+        LoadPlan.builder().loadFileTo("testFile1.rf", 
LoadPlan.RangeType.TABLE, null, "a")
+            .loadFileTo("testFile1.rf", LoadPlan.RangeType.TABLE, "a", 
"b").build();
+    assertEquals(expectedLoadPlan2.toJson(), loadPlan2.toJson());
   }
 
   @Test

Reply via email to