This is an automated email from the ASF dual-hosted git repository.

cconnell pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new e85897faf93 HBASE-29432: Provide mechanism to plug in rack or host 
locality logic into ExportSnapshot (#7129)
e85897faf93 is described below

commit e85897faf93a01a311d745e9e6a22c3874df5c91
Author: Charles Connell <char...@charlesconnell.com>
AuthorDate: Tue Jul 1 10:19:54 2025 -0400

    HBASE-29432: Provide mechanism to plug in rack or host locality logic into 
ExportSnapshot (#7129)
    
    Signed-off by: Wellington Ramos Chevreuil <wchevre...@apache.org>
    Signed-off by: Ray Mattingly <rmattin...@apache.org>
    Reviewed by: Nick Dimiduk <ndimi...@apache.org>
---
 .../hadoop/hbase/snapshot/ExportSnapshot.java      | 177 ++++++++++++++++++---
 .../hbase/snapshot/TestExportSnapshotHelpers.java  | 175 +++++++++++++++++++-
 2 files changed, 326 insertions(+), 26 deletions(-)

diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
index b35a28e0d2d..9616bb1605a 100644
--- 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
@@ -24,6 +24,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -37,6 +38,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -75,12 +77,15 @@ import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
 import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
 import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
 
@@ -118,6 +123,10 @@ public class ExportSnapshot extends AbstractHBaseTool 
implements Tool {
   private static final String CONF_MAP_GROUP = 
"snapshot.export.default.map.group";
   private static final String CONF_BANDWIDTH_MB = 
"snapshot.export.map.bandwidth.mb";
   private static final String CONF_MR_JOB_NAME = "mapreduce.job.name";
+  private static final String CONF_INPUT_FILE_GROUPER_CLASS =
+    "snapshot.export.input.file.grouper.class";
+  private static final String CONF_INPUT_FILE_LOCATION_RESOLVER_CLASS =
+    "snapshot.export.input.file.location.resolver.class";
   protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
   private static final String CONF_COPY_MANIFEST_THREADS =
     "snapshot.export.copy.references.threads";
@@ -156,13 +165,21 @@ public class ExportSnapshot extends AbstractHBaseTool 
implements Tool {
     static final Option CHMOD =
       new Option(null, "chmod", true, "Change the permission of the files to 
the specified one.");
     static final Option MAPPERS = new Option(null, "mappers", true,
-      "Number of mappers to use during the copy (mapreduce.job.maps).");
+      "Number of mappers to use during the copy (mapreduce.job.maps). "
+        + "If you provide a --custom-file-grouper, "
+        + "then --mappers is interpreted as the number of mappers per group.");
     static final Option BANDWIDTH =
       new Option(null, "bandwidth", true, "Limit bandwidth to this value in 
MB/second.");
     static final Option RESET_TTL =
       new Option(null, "reset-ttl", false, "Do not copy TTL for the snapshot");
     static final Option STORAGE_POLICY = new Option(null, "storage-policy", 
true,
       "Storage policy for export snapshot output directory, with format like: 
f=HOT&g=ALL_SDD");
+    static final Option CUSTOM_FILE_GROUPER = new Option(null, 
"custom-file-grouper", true,
+      "Fully qualified class name of an implementation of 
ExportSnapshot.CustomFileGrouper. "
+        + "See JavaDoc on that class for more information.");
+    static final Option FILE_LOCATION_RESOLVER = new Option(null, 
"file-location-resolver", true,
+      "Fully qualified class name of an implementation of 
ExportSnapshot.FileLocationResolver. "
+        + "See JavaDoc on that class for more information.");
   }
 
   // Export Map-Reduce Counters, to keep track of the progress
@@ -185,6 +202,54 @@ public class ExportSnapshot extends AbstractHBaseTool 
implements Tool {
     INCOMPATIBLE, // checksum comparison is not compatible.
   }
 
+  /**
+   * If desired, you may implement a CustomFileGrouper in order to influence 
how ExportSnapshot
+   * chooses which input files go into the MapReduce job's {@link 
InputSplit}s. Your implementation
+   * must return a data structure that contains each input file exactly once. 
Files that appear in
+   * separate entries in the top-level returned Collection are guaranteed to 
not be placed in the
+   * same InputSplit. This can be used to segregate your input files by the 
rack or host on which
+   * they are available, which, used in conjunction with {@link 
FileLocationResolver}, can improve
+   * the performance of your ExportSnapshot runs. To use this, pass the 
--custom-file-grouper
+   * argument with the fully qualified class name of an implementation of 
CustomFileGrouper that's
+   * on the classpath. If this argument is not used, no particular grouping 
logic will be applied.
+   */
+  @InterfaceAudience.Public
+  public interface CustomFileGrouper {
+    Collection<Collection<Pair<SnapshotFileInfo, Long>>>
+      getGroupedInputFiles(final Collection<Pair<SnapshotFileInfo, Long>> 
snapshotFiles);
+  }
+
+  private static class NoopCustomFileGrouper implements CustomFileGrouper {
+    @Override
+    public Collection<Collection<Pair<SnapshotFileInfo, Long>>>
+      getGroupedInputFiles(final Collection<Pair<SnapshotFileInfo, Long>> 
snapshotFiles) {
+      return ImmutableList.of(snapshotFiles);
+    }
+  }
+
+  /**
+   * If desired, you may implement a FileLocationResolver in order to 
influence the _location_
+   * metadata attached to each {@link InputSplit} that ExportSnapshot will 
submit to YARN. The
+   * method {@link #getLocationsForInputFiles(Collection)} method is called 
once for each InputSplit
+   * being constructed. Whatever is returned will ultimately be reported by 
that split's
+   * {@link InputSplit#getLocations()} method. This can be used to encourage 
YARN to schedule the
+   * ExportSnapshot's mappers on rack-local or host-local NodeManagers. To use 
this, pass the
+   * --file-location-resolver argument with the fully qualified class name of 
an implementation of
+   * FileLocationResolver that's on the classpath. If this argument is not 
used, no locations will
+   * be attached to the InputSplits.
+   */
+  @InterfaceAudience.Public
+  public interface FileLocationResolver {
+    Set<String> getLocationsForInputFiles(final 
Collection<Pair<SnapshotFileInfo, Long>> files);
+  }
+
+  static class NoopFileLocationResolver implements FileLocationResolver {
+    @Override
+    public Set<String> 
getLocationsForInputFiles(Collection<Pair<SnapshotFileInfo, Long>> files) {
+      return ImmutableSet.of();
+    }
+  }
+
   private static class ExportMapper
     extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
     private static final Logger LOG = 
LoggerFactory.getLogger(ExportMapper.class);
@@ -721,8 +786,9 @@ public class ExportSnapshot extends AbstractHBaseTool 
implements Tool {
    * The algorithm used is pretty straightforward; the file list is sorted by 
size, and then each
    * group fetch the bigger file available, iterating through groups 
alternating the direction.
    */
-  static List<List<Pair<SnapshotFileInfo, Long>>>
-    getBalancedSplits(final List<Pair<SnapshotFileInfo, Long>> files, final 
int ngroups) {
+  static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits(
+    final Collection<Pair<SnapshotFileInfo, Long>> unsortedFiles, final int 
ngroups) {
+    List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>(unsortedFiles);
     // Sort files by size, from small to big
     Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
       public int compare(Pair<SnapshotFileInfo, Long> a, 
Pair<SnapshotFileInfo, Long> b) {
@@ -733,7 +799,6 @@ public class ExportSnapshot extends AbstractHBaseTool 
implements Tool {
 
     // create balanced groups
     List<List<Pair<SnapshotFileInfo, Long>>> fileGroups = new LinkedList<>();
-    long[] sizeGroups = new long[ngroups];
     int hi = files.size() - 1;
     int lo = 0;
 
@@ -752,7 +817,6 @@ public class ExportSnapshot extends AbstractHBaseTool 
implements Tool {
       Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
 
       // add the hi one
-      sizeGroups[g] += fileInfo.getSecond();
       group.add(fileInfo);
 
       // change direction when at the end or the beginning
@@ -766,16 +830,10 @@ public class ExportSnapshot extends AbstractHBaseTool 
implements Tool {
       }
     }
 
-    if (LOG.isDebugEnabled()) {
-      for (int i = 0; i < sizeGroups.length; ++i) {
-        LOG.debug("export split=" + i + " size=" + 
Strings.humanReadableInt(sizeGroups[i]));
-      }
-    }
-
     return fileGroups;
   }
 
-  private static class ExportSnapshotInputFormat extends 
InputFormat<BytesWritable, NullWritable> {
+  static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, 
NullWritable> {
     @Override
     public RecordReader<BytesWritable, NullWritable> 
createRecordReader(InputSplit split,
       TaskAttemptContext tac) throws IOException, InterruptedException {
@@ -789,37 +847,78 @@ public class ExportSnapshot extends AbstractHBaseTool 
implements Tool {
       FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
 
       List<Pair<SnapshotFileInfo, Long>> snapshotFiles = 
getSnapshotFiles(conf, fs, snapshotDir);
+
+      Collection<List<Pair<SnapshotFileInfo, Long>>> balancedGroups =
+        groupFilesForSplits(conf, snapshotFiles);
+
+      Class<? extends FileLocationResolver> fileLocationResolverClass =
+        conf.getClass(CONF_INPUT_FILE_LOCATION_RESOLVER_CLASS, 
NoopFileLocationResolver.class,
+          FileLocationResolver.class);
+      FileLocationResolver fileLocationResolver =
+        ReflectionUtils.newInstance(fileLocationResolverClass, conf);
+      LOG.info("FileLocationResolver {} will provide location metadata for 
each InputSplit",
+        fileLocationResolverClass);
+
+      List<InputSplit> splits = new ArrayList<>(balancedGroups.size());
+      for (Collection<Pair<SnapshotFileInfo, Long>> files : balancedGroups) {
+        splits.add(new ExportSnapshotInputSplit(files, fileLocationResolver));
+      }
+      return splits;
+    }
+
+    Collection<List<Pair<SnapshotFileInfo, Long>>> 
groupFilesForSplits(Configuration conf,
+      List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
       int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
-      if (mappers == 0 && snapshotFiles.size() > 0) {
+      if (mappers == 0 && !snapshotFiles.isEmpty()) {
         mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
         mappers = Math.min(mappers, snapshotFiles.size());
         conf.setInt(CONF_NUM_SPLITS, mappers);
         conf.setInt(MR_NUM_MAPS, mappers);
       }
 
-      List<List<Pair<SnapshotFileInfo, Long>>> groups = 
getBalancedSplits(snapshotFiles, mappers);
-      List<InputSplit> splits = new ArrayList(groups.size());
-      for (List<Pair<SnapshotFileInfo, Long>> files : groups) {
-        splits.add(new ExportSnapshotInputSplit(files));
-      }
-      return splits;
+      Class<? extends CustomFileGrouper> inputFileGrouperClass = conf.getClass(
+        CONF_INPUT_FILE_GROUPER_CLASS, NoopCustomFileGrouper.class, 
CustomFileGrouper.class);
+      CustomFileGrouper customFileGrouper =
+        ReflectionUtils.newInstance(inputFileGrouperClass, conf);
+      Collection<Collection<Pair<SnapshotFileInfo, Long>>> groups =
+        customFileGrouper.getGroupedInputFiles(snapshotFiles);
+
+      LOG.info("CustomFileGrouper {} split input files into {} groups", 
inputFileGrouperClass,
+        groups.size());
+      int mappersPerGroup = groups.isEmpty() ? 1 : Math.max(mappers / 
groups.size(), 1);
+      LOG.info(
+        "Splitting each group into {} InputSplits, "
+          + "to achieve closest possible amount of mappers to target of {}",
+        mappersPerGroup, mappers);
+
+      // Within each group, create splits of equal size. Groups are not mixed 
together.
+      return groups.stream().map(g -> getBalancedSplits(g, mappersPerGroup))
+        .flatMap(Collection::stream).collect(Collectors.toList());
     }
 
-    private static class ExportSnapshotInputSplit extends InputSplit 
implements Writable {
+    static class ExportSnapshotInputSplit extends InputSplit implements 
Writable {
+
       private List<Pair<BytesWritable, Long>> files;
+      private String[] locations;
       private long length;
 
       public ExportSnapshotInputSplit() {
         this.files = null;
+        this.locations = null;
       }
 
-      public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> 
snapshotFiles) {
-        this.files = new ArrayList(snapshotFiles.size());
+      public ExportSnapshotInputSplit(final Collection<Pair<SnapshotFileInfo, 
Long>> snapshotFiles,
+        FileLocationResolver fileLocationResolver) {
+        this.files = new ArrayList<>(snapshotFiles.size());
         for (Pair<SnapshotFileInfo, Long> fileInfo : snapshotFiles) {
           this.files.add(
             new Pair<>(new BytesWritable(fileInfo.getFirst().toByteArray()), 
fileInfo.getSecond()));
           this.length += fileInfo.getSecond();
         }
+        this.locations =
+          
fileLocationResolver.getLocationsForInputFiles(snapshotFiles).toArray(new 
String[0]);
+        LOG.trace("This ExportSnapshotInputSplit has files {} of collective 
size {}, "
+          + "with location hints: {}", files, length, locations);
       }
 
       private List<Pair<BytesWritable, Long>> getSplitKeys() {
@@ -833,7 +932,7 @@ public class ExportSnapshot extends AbstractHBaseTool 
implements Tool {
 
       @Override
       public String[] getLocations() throws IOException, InterruptedException {
-        return new String[] {};
+        return locations;
       }
 
       @Override
@@ -848,6 +947,12 @@ public class ExportSnapshot extends AbstractHBaseTool 
implements Tool {
           files.add(new Pair<>(fileInfo, size));
           length += size;
         }
+        int locationCount = in.readInt();
+        List<String> locations = new ArrayList<>(locationCount);
+        for (int i = 0; i < locationCount; ++i) {
+          locations.add(in.readUTF());
+        }
+        this.locations = locations.toArray(new String[0]);
       }
 
       @Override
@@ -857,6 +962,10 @@ public class ExportSnapshot extends AbstractHBaseTool 
implements Tool {
           fileInfo.getFirst().write(out);
           out.writeLong(fileInfo.getSecond());
         }
+        out.writeInt(locations.length);
+        for (String location : locations) {
+          out.writeUTF(location);
+        }
       }
     }
 
@@ -917,7 +1026,8 @@ public class ExportSnapshot extends AbstractHBaseTool 
implements Tool {
   private void runCopyJob(final Path inputRoot, final Path outputRoot, final 
String snapshotName,
     final Path snapshotDir, final boolean verifyChecksum, final String 
filesUser,
     final String filesGroup, final int filesMode, final int mappers, final int 
bandwidthMB,
-    final String storagePolicy) throws IOException, InterruptedException, 
ClassNotFoundException {
+    final String storagePolicy, final String customFileGrouper, final String 
fileLocationResolver)
+    throws IOException, InterruptedException, ClassNotFoundException {
     Configuration conf = getConf();
     if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
     if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
@@ -937,6 +1047,12 @@ public class ExportSnapshot extends AbstractHBaseTool 
implements Tool {
         conf.set(generateFamilyStoragePolicyKey(entry.getKey()), 
entry.getValue());
       }
     }
+    if (customFileGrouper != null) {
+      conf.set(CONF_INPUT_FILE_GROUPER_CLASS, customFileGrouper);
+    }
+    if (fileLocationResolver != null) {
+      conf.set(CONF_INPUT_FILE_LOCATION_RESOLVER_CLASS, fileLocationResolver);
+    }
 
     String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + 
snapshotName);
     Job job = new Job(conf);
@@ -1055,6 +1171,8 @@ public class ExportSnapshot extends AbstractHBaseTool 
implements Tool {
   private int mappers = 0;
   private boolean resetTtl = false;
   private String storagePolicy = null;
+  private String customFileGrouper = null;
+  private String fileLocationResolver = null;
 
   @Override
   protected void processOptions(CommandLine cmd) {
@@ -1080,6 +1198,12 @@ public class ExportSnapshot extends AbstractHBaseTool 
implements Tool {
     if (cmd.hasOption(Options.STORAGE_POLICY.getLongOpt())) {
       storagePolicy = cmd.getOptionValue(Options.STORAGE_POLICY.getLongOpt());
     }
+    if (cmd.hasOption(Options.CUSTOM_FILE_GROUPER.getLongOpt())) {
+      customFileGrouper = 
cmd.getOptionValue(Options.CUSTOM_FILE_GROUPER.getLongOpt());
+    }
+    if (cmd.hasOption(Options.FILE_LOCATION_RESOLVER.getLongOpt())) {
+      fileLocationResolver = 
cmd.getOptionValue(Options.FILE_LOCATION_RESOLVER.getLongOpt());
+    }
   }
 
   /**
@@ -1248,7 +1372,8 @@ public class ExportSnapshot extends AbstractHBaseTool 
implements Tool {
     // by the HFileArchiver, since they have no references.
     try {
       runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, 
verifyChecksum, filesUser,
-        filesGroup, filesMode, mappers, bandwidthMB, storagePolicy);
+        filesGroup, filesMode, mappers, bandwidthMB, storagePolicy, 
customFileGrouper,
+        fileLocationResolver);
 
       LOG.info("Finalize the Snapshot Export");
       if (!skipTmp) {
@@ -1306,6 +1431,8 @@ public class ExportSnapshot extends AbstractHBaseTool 
implements Tool {
     addOption(Options.MAPPERS);
     addOption(Options.BANDWIDTH);
     addOption(Options.RESET_TTL);
+    addOption(Options.CUSTOM_FILE_GROUPER);
+    addOption(Options.FILE_LOCATION_RESOLVER);
   }
 
   public static void main(String[] args) {
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshotHelpers.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshotHelpers.java
index 71402d0989d..72ca0c3f7c2 100644
--- 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshotHelpers.java
+++ 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshotHelpers.java
@@ -18,9 +18,14 @@
 package org.apache.hadoop.hbase.snapshot;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -48,7 +53,7 @@ public class TestExportSnapshotHelpers {
    * assign to each group a file, going back and forth through the groups.
    */
   @Test
-  public void testBalanceSplit() throws Exception {
+  public void testBalanceSplit() {
     // Create a list of files
     List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>(21);
     for (long i = 0; i <= 20; i++) {
@@ -89,4 +94,172 @@ public class TestExportSnapshotHelpers {
     }
     assertEquals(expectedSize, totalSize);
   }
+
+  @Test
+  public void testGroupFilesForSplitsWithoutCustomFileGrouper() {
+    List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>();
+    for (long i = 0; i < 10; i++) {
+      SnapshotFileInfo fileInfo = 
SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE)
+        .setHfile("file-" + i).build();
+      files.add(new Pair<>(fileInfo, i * 10));
+    }
+
+    Configuration conf = new Configuration();
+    conf.setInt("snapshot.export.format.splits", 3);
+
+    ExportSnapshot.ExportSnapshotInputFormat inputFormat =
+      new ExportSnapshot.ExportSnapshotInputFormat();
+    Collection<List<Pair<SnapshotFileInfo, Long>>> groups =
+      inputFormat.groupFilesForSplits(conf, files);
+
+    assertEquals("Should create 3 groups", 3, groups.size());
+
+    long totalSize = 0;
+    int totalFiles = 0;
+    for (List<Pair<SnapshotFileInfo, Long>> group : groups) {
+      for (Pair<SnapshotFileInfo, Long> file : group) {
+        totalSize += file.getSecond();
+        totalFiles++;
+      }
+    }
+
+    assertEquals("All files should be included", 10, totalFiles);
+    assertEquals("Total size should be preserved", 450, totalSize);
+  }
+
+  @Test
+  public void testGroupFilesForSplitsWithCustomFileGrouper() {
+    List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>();
+    for (long i = 0; i < 8; i++) {
+      SnapshotFileInfo fileInfo = 
SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE)
+        .setHfile("file-" + i).build();
+      files.add(new Pair<>(fileInfo, i * 5));
+    }
+
+    Configuration conf = new Configuration();
+    conf.setInt("snapshot.export.format.splits", 4);
+    conf.setClass("snapshot.export.input.file.grouper.class", 
TestCustomFileGrouper.class,
+      ExportSnapshot.CustomFileGrouper.class);
+
+    ExportSnapshot.ExportSnapshotInputFormat inputFormat =
+      new ExportSnapshot.ExportSnapshotInputFormat();
+    Collection<List<Pair<SnapshotFileInfo, Long>>> groups =
+      inputFormat.groupFilesForSplits(conf, files);
+
+    assertEquals("Should create splits based on custom grouper output", 4, 
groups.size());
+
+    long totalSize = 0;
+    int totalFiles = 0;
+    for (List<Pair<SnapshotFileInfo, Long>> group : groups) {
+      for (Pair<SnapshotFileInfo, Long> file : group) {
+        totalSize += file.getSecond();
+        totalFiles++;
+      }
+    }
+
+    assertEquals("All files should be included", 8, totalFiles);
+    assertEquals("Total size should be preserved", 140, totalSize);
+  }
+
+  @Test
+  public void testFileLocationResolverWithNoopResolver() {
+    List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>();
+    for (long i = 0; i < 3; i++) {
+      SnapshotFileInfo fileInfo = 
SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE)
+        .setHfile("file-" + i).build();
+      files.add(new Pair<>(fileInfo, i * 10));
+    }
+
+    ExportSnapshot.NoopFileLocationResolver resolver =
+      new ExportSnapshot.NoopFileLocationResolver();
+    Set<String> locations = resolver.getLocationsForInputFiles(files);
+
+    assertTrue("NoopFileLocationResolver should return empty locations", 
locations.isEmpty());
+  }
+
+  @Test
+  public void testFileLocationResolverWithCustomResolver() {
+    List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>();
+    for (long i = 0; i < 3; i++) {
+      SnapshotFileInfo fileInfo = 
SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE)
+        .setHfile("file-" + i).build();
+      files.add(new Pair<>(fileInfo, i * 10));
+    }
+
+    TestFileLocationResolver resolver = new TestFileLocationResolver();
+    Set<String> locations = resolver.getLocationsForInputFiles(files);
+
+    assertEquals("Should return expected locations", 2, locations.size());
+    assertTrue("Should contain rack1", locations.contains("rack1"));
+    assertTrue("Should contain rack2", locations.contains("rack2"));
+  }
+
+  @Test
+  public void testInputSplitWithFileLocationResolver() {
+    List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>();
+    for (long i = 0; i < 3; i++) {
+      SnapshotFileInfo fileInfo = 
SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE)
+        .setHfile("file-" + i).build();
+      files.add(new Pair<>(fileInfo, i * 10));
+    }
+
+    TestFileLocationResolver resolver = new TestFileLocationResolver();
+    ExportSnapshot.ExportSnapshotInputFormat.ExportSnapshotInputSplit split =
+      new 
ExportSnapshot.ExportSnapshotInputFormat.ExportSnapshotInputSplit(files, 
resolver);
+
+    try {
+      String[] locations = split.getLocations();
+      assertEquals("Should return 2 locations", 2, locations.length);
+
+      boolean hasRack1 = false;
+      boolean hasRack2 = false;
+      for (String location : locations) {
+        if ("rack1".equals(location)) {
+          hasRack1 = true;
+        }
+        if ("rack2".equals(location)) {
+          hasRack2 = true;
+        }
+      }
+
+      assertTrue("Should contain rack1", hasRack1);
+      assertTrue("Should contain rack2", hasRack2);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to get locations", e);
+    }
+  }
+
+  public static class TestCustomFileGrouper implements 
ExportSnapshot.CustomFileGrouper {
+    @Override
+    public Collection<Collection<Pair<SnapshotFileInfo, Long>>>
+      getGroupedInputFiles(Collection<Pair<SnapshotFileInfo, Long>> 
snapshotFiles) {
+      List<Collection<Pair<SnapshotFileInfo, Long>>> groups = new 
ArrayList<>();
+      List<Pair<SnapshotFileInfo, Long>> group1 = new ArrayList<>();
+      List<Pair<SnapshotFileInfo, Long>> group2 = new ArrayList<>();
+
+      int count = 0;
+      for (Pair<SnapshotFileInfo, Long> file : snapshotFiles) {
+        if (count % 2 == 0) {
+          group1.add(file);
+        } else {
+          group2.add(file);
+        }
+        count++;
+      }
+
+      groups.add(group1);
+      groups.add(group2);
+      return groups;
+    }
+  }
+
+  public static class TestFileLocationResolver implements 
ExportSnapshot.FileLocationResolver {
+    @Override
+    public Set<String> 
getLocationsForInputFiles(Collection<Pair<SnapshotFileInfo, Long>> files) {
+      Set<String> locations = new HashSet<>();
+      locations.add("rack1");
+      locations.add("rack2");
+      return locations;
+    }
+  }
 }

Reply via email to