This is an automated email from the ASF dual-hosted git repository. cconnell pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push: new e85897faf93 HBASE-29432: Provide mechanism to plug in rack or host locality logic into ExportSnapshot (#7129) e85897faf93 is described below commit e85897faf93a01a311d745e9e6a22c3874df5c91 Author: Charles Connell <char...@charlesconnell.com> AuthorDate: Tue Jul 1 10:19:54 2025 -0400 HBASE-29432: Provide mechanism to plug in rack or host locality logic into ExportSnapshot (#7129) Signed-off by: Wellington Ramos Chevreuil <wchevre...@apache.org> Signed-off by: Ray Mattingly <rmattin...@apache.org> Reviewed by: Nick Dimiduk <ndimi...@apache.org> --- .../hadoop/hbase/snapshot/ExportSnapshot.java | 177 ++++++++++++++++++--- .../hbase/snapshot/TestExportSnapshotHelpers.java | 175 +++++++++++++++++++- 2 files changed, 326 insertions(+), 26 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java index b35a28e0d2d..9616bb1605a 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java @@ -24,6 +24,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -37,6 +38,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.function.BiConsumer; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -75,12 +77,15 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.security.TokenCache; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; @@ -118,6 +123,10 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool { private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group"; private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb"; private static final String CONF_MR_JOB_NAME = "mapreduce.job.name"; + private static final String CONF_INPUT_FILE_GROUPER_CLASS = + "snapshot.export.input.file.grouper.class"; + private static final String CONF_INPUT_FILE_LOCATION_RESOLVER_CLASS = + "snapshot.export.input.file.location.resolver.class"; protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp"; private static final String CONF_COPY_MANIFEST_THREADS = "snapshot.export.copy.references.threads"; @@ -156,13 +165,21 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool { static final Option CHMOD = new Option(null, "chmod", true, "Change the permission of the files to the specified one."); static final Option MAPPERS = new Option(null, "mappers", true, - "Number of mappers to use during the copy (mapreduce.job.maps)."); + "Number of mappers to use during the copy (mapreduce.job.maps). " + + "If you provide a --custom-file-grouper, " + + "then --mappers is interpreted as the number of mappers per group."); static final Option BANDWIDTH = new Option(null, "bandwidth", true, "Limit bandwidth to this value in MB/second."); static final Option RESET_TTL = new Option(null, "reset-ttl", false, "Do not copy TTL for the snapshot"); static final Option STORAGE_POLICY = new Option(null, "storage-policy", true, "Storage policy for export snapshot output directory, with format like: f=HOT&g=ALL_SDD"); + static final Option CUSTOM_FILE_GROUPER = new Option(null, "custom-file-grouper", true, + "Fully qualified class name of an implementation of ExportSnapshot.CustomFileGrouper. " + + "See JavaDoc on that class for more information."); + static final Option FILE_LOCATION_RESOLVER = new Option(null, "file-location-resolver", true, + "Fully qualified class name of an implementation of ExportSnapshot.FileLocationResolver. " + + "See JavaDoc on that class for more information."); } // Export Map-Reduce Counters, to keep track of the progress @@ -185,6 +202,54 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool { INCOMPATIBLE, // checksum comparison is not compatible. } + /** + * If desired, you may implement a CustomFileGrouper in order to influence how ExportSnapshot + * chooses which input files go into the MapReduce job's {@link InputSplit}s. Your implementation + * must return a data structure that contains each input file exactly once. Files that appear in + * separate entries in the top-level returned Collection are guaranteed to not be placed in the + * same InputSplit. This can be used to segregate your input files by the rack or host on which + * they are available, which, used in conjunction with {@link FileLocationResolver}, can improve + * the performance of your ExportSnapshot runs. To use this, pass the --custom-file-grouper + * argument with the fully qualified class name of an implementation of CustomFileGrouper that's + * on the classpath. If this argument is not used, no particular grouping logic will be applied. + */ + @InterfaceAudience.Public + public interface CustomFileGrouper { + Collection<Collection<Pair<SnapshotFileInfo, Long>>> + getGroupedInputFiles(final Collection<Pair<SnapshotFileInfo, Long>> snapshotFiles); + } + + private static class NoopCustomFileGrouper implements CustomFileGrouper { + @Override + public Collection<Collection<Pair<SnapshotFileInfo, Long>>> + getGroupedInputFiles(final Collection<Pair<SnapshotFileInfo, Long>> snapshotFiles) { + return ImmutableList.of(snapshotFiles); + } + } + + /** + * If desired, you may implement a FileLocationResolver in order to influence the _location_ + * metadata attached to each {@link InputSplit} that ExportSnapshot will submit to YARN. The + * method {@link #getLocationsForInputFiles(Collection)} method is called once for each InputSplit + * being constructed. Whatever is returned will ultimately be reported by that split's + * {@link InputSplit#getLocations()} method. This can be used to encourage YARN to schedule the + * ExportSnapshot's mappers on rack-local or host-local NodeManagers. To use this, pass the + * --file-location-resolver argument with the fully qualified class name of an implementation of + * FileLocationResolver that's on the classpath. If this argument is not used, no locations will + * be attached to the InputSplits. + */ + @InterfaceAudience.Public + public interface FileLocationResolver { + Set<String> getLocationsForInputFiles(final Collection<Pair<SnapshotFileInfo, Long>> files); + } + + static class NoopFileLocationResolver implements FileLocationResolver { + @Override + public Set<String> getLocationsForInputFiles(Collection<Pair<SnapshotFileInfo, Long>> files) { + return ImmutableSet.of(); + } + } + private static class ExportMapper extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> { private static final Logger LOG = LoggerFactory.getLogger(ExportMapper.class); @@ -721,8 +786,9 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool { * The algorithm used is pretty straightforward; the file list is sorted by size, and then each * group fetch the bigger file available, iterating through groups alternating the direction. */ - static List<List<Pair<SnapshotFileInfo, Long>>> - getBalancedSplits(final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) { + static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits( + final Collection<Pair<SnapshotFileInfo, Long>> unsortedFiles, final int ngroups) { + List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>(unsortedFiles); // Sort files by size, from small to big Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() { public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) { @@ -733,7 +799,6 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool { // create balanced groups List<List<Pair<SnapshotFileInfo, Long>>> fileGroups = new LinkedList<>(); - long[] sizeGroups = new long[ngroups]; int hi = files.size() - 1; int lo = 0; @@ -752,7 +817,6 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool { Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--); // add the hi one - sizeGroups[g] += fileInfo.getSecond(); group.add(fileInfo); // change direction when at the end or the beginning @@ -766,16 +830,10 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool { } } - if (LOG.isDebugEnabled()) { - for (int i = 0; i < sizeGroups.length; ++i) { - LOG.debug("export split=" + i + " size=" + Strings.humanReadableInt(sizeGroups[i])); - } - } - return fileGroups; } - private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> { + static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> { @Override public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split, TaskAttemptContext tac) throws IOException, InterruptedException { @@ -789,37 +847,78 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool { FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf); List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir); + + Collection<List<Pair<SnapshotFileInfo, Long>>> balancedGroups = + groupFilesForSplits(conf, snapshotFiles); + + Class<? extends FileLocationResolver> fileLocationResolverClass = + conf.getClass(CONF_INPUT_FILE_LOCATION_RESOLVER_CLASS, NoopFileLocationResolver.class, + FileLocationResolver.class); + FileLocationResolver fileLocationResolver = + ReflectionUtils.newInstance(fileLocationResolverClass, conf); + LOG.info("FileLocationResolver {} will provide location metadata for each InputSplit", + fileLocationResolverClass); + + List<InputSplit> splits = new ArrayList<>(balancedGroups.size()); + for (Collection<Pair<SnapshotFileInfo, Long>> files : balancedGroups) { + splits.add(new ExportSnapshotInputSplit(files, fileLocationResolver)); + } + return splits; + } + + Collection<List<Pair<SnapshotFileInfo, Long>>> groupFilesForSplits(Configuration conf, + List<Pair<SnapshotFileInfo, Long>> snapshotFiles) { int mappers = conf.getInt(CONF_NUM_SPLITS, 0); - if (mappers == 0 && snapshotFiles.size() > 0) { + if (mappers == 0 && !snapshotFiles.isEmpty()) { mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10)); mappers = Math.min(mappers, snapshotFiles.size()); conf.setInt(CONF_NUM_SPLITS, mappers); conf.setInt(MR_NUM_MAPS, mappers); } - List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers); - List<InputSplit> splits = new ArrayList(groups.size()); - for (List<Pair<SnapshotFileInfo, Long>> files : groups) { - splits.add(new ExportSnapshotInputSplit(files)); - } - return splits; + Class<? extends CustomFileGrouper> inputFileGrouperClass = conf.getClass( + CONF_INPUT_FILE_GROUPER_CLASS, NoopCustomFileGrouper.class, CustomFileGrouper.class); + CustomFileGrouper customFileGrouper = + ReflectionUtils.newInstance(inputFileGrouperClass, conf); + Collection<Collection<Pair<SnapshotFileInfo, Long>>> groups = + customFileGrouper.getGroupedInputFiles(snapshotFiles); + + LOG.info("CustomFileGrouper {} split input files into {} groups", inputFileGrouperClass, + groups.size()); + int mappersPerGroup = groups.isEmpty() ? 1 : Math.max(mappers / groups.size(), 1); + LOG.info( + "Splitting each group into {} InputSplits, " + + "to achieve closest possible amount of mappers to target of {}", + mappersPerGroup, mappers); + + // Within each group, create splits of equal size. Groups are not mixed together. + return groups.stream().map(g -> getBalancedSplits(g, mappersPerGroup)) + .flatMap(Collection::stream).collect(Collectors.toList()); } - private static class ExportSnapshotInputSplit extends InputSplit implements Writable { + static class ExportSnapshotInputSplit extends InputSplit implements Writable { + private List<Pair<BytesWritable, Long>> files; + private String[] locations; private long length; public ExportSnapshotInputSplit() { this.files = null; + this.locations = null; } - public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) { - this.files = new ArrayList(snapshotFiles.size()); + public ExportSnapshotInputSplit(final Collection<Pair<SnapshotFileInfo, Long>> snapshotFiles, + FileLocationResolver fileLocationResolver) { + this.files = new ArrayList<>(snapshotFiles.size()); for (Pair<SnapshotFileInfo, Long> fileInfo : snapshotFiles) { this.files.add( new Pair<>(new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond())); this.length += fileInfo.getSecond(); } + this.locations = + fileLocationResolver.getLocationsForInputFiles(snapshotFiles).toArray(new String[0]); + LOG.trace("This ExportSnapshotInputSplit has files {} of collective size {}, " + + "with location hints: {}", files, length, locations); } private List<Pair<BytesWritable, Long>> getSplitKeys() { @@ -833,7 +932,7 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool { @Override public String[] getLocations() throws IOException, InterruptedException { - return new String[] {}; + return locations; } @Override @@ -848,6 +947,12 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool { files.add(new Pair<>(fileInfo, size)); length += size; } + int locationCount = in.readInt(); + List<String> locations = new ArrayList<>(locationCount); + for (int i = 0; i < locationCount; ++i) { + locations.add(in.readUTF()); + } + this.locations = locations.toArray(new String[0]); } @Override @@ -857,6 +962,10 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool { fileInfo.getFirst().write(out); out.writeLong(fileInfo.getSecond()); } + out.writeInt(locations.length); + for (String location : locations) { + out.writeUTF(location); + } } } @@ -917,7 +1026,8 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool { private void runCopyJob(final Path inputRoot, final Path outputRoot, final String snapshotName, final Path snapshotDir, final boolean verifyChecksum, final String filesUser, final String filesGroup, final int filesMode, final int mappers, final int bandwidthMB, - final String storagePolicy) throws IOException, InterruptedException, ClassNotFoundException { + final String storagePolicy, final String customFileGrouper, final String fileLocationResolver) + throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = getConf(); if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup); if (filesUser != null) conf.set(CONF_FILES_USER, filesUser); @@ -937,6 +1047,12 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool { conf.set(generateFamilyStoragePolicyKey(entry.getKey()), entry.getValue()); } } + if (customFileGrouper != null) { + conf.set(CONF_INPUT_FILE_GROUPER_CLASS, customFileGrouper); + } + if (fileLocationResolver != null) { + conf.set(CONF_INPUT_FILE_LOCATION_RESOLVER_CLASS, fileLocationResolver); + } String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName); Job job = new Job(conf); @@ -1055,6 +1171,8 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool { private int mappers = 0; private boolean resetTtl = false; private String storagePolicy = null; + private String customFileGrouper = null; + private String fileLocationResolver = null; @Override protected void processOptions(CommandLine cmd) { @@ -1080,6 +1198,12 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool { if (cmd.hasOption(Options.STORAGE_POLICY.getLongOpt())) { storagePolicy = cmd.getOptionValue(Options.STORAGE_POLICY.getLongOpt()); } + if (cmd.hasOption(Options.CUSTOM_FILE_GROUPER.getLongOpt())) { + customFileGrouper = cmd.getOptionValue(Options.CUSTOM_FILE_GROUPER.getLongOpt()); + } + if (cmd.hasOption(Options.FILE_LOCATION_RESOLVER.getLongOpt())) { + fileLocationResolver = cmd.getOptionValue(Options.FILE_LOCATION_RESOLVER.getLongOpt()); + } } /** @@ -1248,7 +1372,8 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool { // by the HFileArchiver, since they have no references. try { runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum, filesUser, - filesGroup, filesMode, mappers, bandwidthMB, storagePolicy); + filesGroup, filesMode, mappers, bandwidthMB, storagePolicy, customFileGrouper, + fileLocationResolver); LOG.info("Finalize the Snapshot Export"); if (!skipTmp) { @@ -1306,6 +1431,8 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool { addOption(Options.MAPPERS); addOption(Options.BANDWIDTH); addOption(Options.RESET_TTL); + addOption(Options.CUSTOM_FILE_GROUPER); + addOption(Options.FILE_LOCATION_RESOLVER); } public static void main(String[] args) { diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshotHelpers.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshotHelpers.java index 71402d0989d..72ca0c3f7c2 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshotHelpers.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshotHelpers.java @@ -18,9 +18,14 @@ package org.apache.hadoop.hbase.snapshot; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -48,7 +53,7 @@ public class TestExportSnapshotHelpers { * assign to each group a file, going back and forth through the groups. */ @Test - public void testBalanceSplit() throws Exception { + public void testBalanceSplit() { // Create a list of files List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>(21); for (long i = 0; i <= 20; i++) { @@ -89,4 +94,172 @@ public class TestExportSnapshotHelpers { } assertEquals(expectedSize, totalSize); } + + @Test + public void testGroupFilesForSplitsWithoutCustomFileGrouper() { + List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>(); + for (long i = 0; i < 10; i++) { + SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE) + .setHfile("file-" + i).build(); + files.add(new Pair<>(fileInfo, i * 10)); + } + + Configuration conf = new Configuration(); + conf.setInt("snapshot.export.format.splits", 3); + + ExportSnapshot.ExportSnapshotInputFormat inputFormat = + new ExportSnapshot.ExportSnapshotInputFormat(); + Collection<List<Pair<SnapshotFileInfo, Long>>> groups = + inputFormat.groupFilesForSplits(conf, files); + + assertEquals("Should create 3 groups", 3, groups.size()); + + long totalSize = 0; + int totalFiles = 0; + for (List<Pair<SnapshotFileInfo, Long>> group : groups) { + for (Pair<SnapshotFileInfo, Long> file : group) { + totalSize += file.getSecond(); + totalFiles++; + } + } + + assertEquals("All files should be included", 10, totalFiles); + assertEquals("Total size should be preserved", 450, totalSize); + } + + @Test + public void testGroupFilesForSplitsWithCustomFileGrouper() { + List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>(); + for (long i = 0; i < 8; i++) { + SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE) + .setHfile("file-" + i).build(); + files.add(new Pair<>(fileInfo, i * 5)); + } + + Configuration conf = new Configuration(); + conf.setInt("snapshot.export.format.splits", 4); + conf.setClass("snapshot.export.input.file.grouper.class", TestCustomFileGrouper.class, + ExportSnapshot.CustomFileGrouper.class); + + ExportSnapshot.ExportSnapshotInputFormat inputFormat = + new ExportSnapshot.ExportSnapshotInputFormat(); + Collection<List<Pair<SnapshotFileInfo, Long>>> groups = + inputFormat.groupFilesForSplits(conf, files); + + assertEquals("Should create splits based on custom grouper output", 4, groups.size()); + + long totalSize = 0; + int totalFiles = 0; + for (List<Pair<SnapshotFileInfo, Long>> group : groups) { + for (Pair<SnapshotFileInfo, Long> file : group) { + totalSize += file.getSecond(); + totalFiles++; + } + } + + assertEquals("All files should be included", 8, totalFiles); + assertEquals("Total size should be preserved", 140, totalSize); + } + + @Test + public void testFileLocationResolverWithNoopResolver() { + List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>(); + for (long i = 0; i < 3; i++) { + SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE) + .setHfile("file-" + i).build(); + files.add(new Pair<>(fileInfo, i * 10)); + } + + ExportSnapshot.NoopFileLocationResolver resolver = + new ExportSnapshot.NoopFileLocationResolver(); + Set<String> locations = resolver.getLocationsForInputFiles(files); + + assertTrue("NoopFileLocationResolver should return empty locations", locations.isEmpty()); + } + + @Test + public void testFileLocationResolverWithCustomResolver() { + List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>(); + for (long i = 0; i < 3; i++) { + SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE) + .setHfile("file-" + i).build(); + files.add(new Pair<>(fileInfo, i * 10)); + } + + TestFileLocationResolver resolver = new TestFileLocationResolver(); + Set<String> locations = resolver.getLocationsForInputFiles(files); + + assertEquals("Should return expected locations", 2, locations.size()); + assertTrue("Should contain rack1", locations.contains("rack1")); + assertTrue("Should contain rack2", locations.contains("rack2")); + } + + @Test + public void testInputSplitWithFileLocationResolver() { + List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>(); + for (long i = 0; i < 3; i++) { + SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE) + .setHfile("file-" + i).build(); + files.add(new Pair<>(fileInfo, i * 10)); + } + + TestFileLocationResolver resolver = new TestFileLocationResolver(); + ExportSnapshot.ExportSnapshotInputFormat.ExportSnapshotInputSplit split = + new ExportSnapshot.ExportSnapshotInputFormat.ExportSnapshotInputSplit(files, resolver); + + try { + String[] locations = split.getLocations(); + assertEquals("Should return 2 locations", 2, locations.length); + + boolean hasRack1 = false; + boolean hasRack2 = false; + for (String location : locations) { + if ("rack1".equals(location)) { + hasRack1 = true; + } + if ("rack2".equals(location)) { + hasRack2 = true; + } + } + + assertTrue("Should contain rack1", hasRack1); + assertTrue("Should contain rack2", hasRack2); + } catch (Exception e) { + throw new RuntimeException("Failed to get locations", e); + } + } + + public static class TestCustomFileGrouper implements ExportSnapshot.CustomFileGrouper { + @Override + public Collection<Collection<Pair<SnapshotFileInfo, Long>>> + getGroupedInputFiles(Collection<Pair<SnapshotFileInfo, Long>> snapshotFiles) { + List<Collection<Pair<SnapshotFileInfo, Long>>> groups = new ArrayList<>(); + List<Pair<SnapshotFileInfo, Long>> group1 = new ArrayList<>(); + List<Pair<SnapshotFileInfo, Long>> group2 = new ArrayList<>(); + + int count = 0; + for (Pair<SnapshotFileInfo, Long> file : snapshotFiles) { + if (count % 2 == 0) { + group1.add(file); + } else { + group2.add(file); + } + count++; + } + + groups.add(group1); + groups.add(group2); + return groups; + } + } + + public static class TestFileLocationResolver implements ExportSnapshot.FileLocationResolver { + @Override + public Set<String> getLocationsForInputFiles(Collection<Pair<SnapshotFileInfo, Long>> files) { + Set<String> locations = new HashSet<>(); + locations.add("rack1"); + locations.add("rack2"); + return locations; + } + } }