This is an automated email from the ASF dual-hosted git repository. jwills pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/crunch.git
commit 31e822c9cc7edceadd1cb8d711034cc528ef92e5 Author: Andrew Olson <[email protected]> AuthorDate: Thu Apr 18 10:26:48 2019 -0500 CRUNCH-681: Updating HFileUtils to accept a filesystem parameter for targets and sources Signed-off-by: Josh Wills <[email protected]> --- .../org/apache/crunch/io/hbase/HFileUtils.java | 135 ++++++++++++++++++--- 1 file changed, 120 insertions(+), 15 deletions(-) diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java index d85481d..458ab22 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java @@ -52,6 +52,7 @@ import org.apache.crunch.lib.sort.TotalOrderPartitioner; import org.apache.crunch.types.writable.Writables; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; @@ -300,11 +301,30 @@ public final class HFileUtils { } } + /** + * Scans HFiles. + * + * @param pipeline the pipeline + * @param path path to HFiles + * @return {@code Result}s + */ public static PCollection<Result> scanHFiles(Pipeline pipeline, Path path) { return scanHFiles(pipeline, path, new Scan()); } /** + * Scans HFiles with source filesystem. + * + * @param pipeline the pipeline + * @param path path to HFiles + * @param fs filesystem where HFiles are located + * @return {@code Result}s + */ + public static PCollection<Result> scanHFiles(Pipeline pipeline, Path path, FileSystem fs) { + return scanHFiles(pipeline, path, new Scan(), fs); + } + + /** * Scans HFiles with filter conditions. * * @param pipeline the pipeline @@ -314,27 +334,74 @@ public final class HFileUtils { * @see #combineIntoRow(org.apache.crunch.PCollection, org.apache.hadoop.hbase.client.Scan) */ public static PCollection<Result> scanHFiles(Pipeline pipeline, Path path, Scan scan) { - return scanHFiles(pipeline, ImmutableList.of(path), scan); + return scanHFiles(pipeline, ImmutableList.of(path), scan); } + /** + * Scans HFiles with filter conditions and source filesystem. + * + * @param pipeline the pipeline + * @param path path to HFiles + * @param scan filtering conditions + * @param fs filesystem where HFiles are located + * @return {@code Result}s + * @see #combineIntoRow(org.apache.crunch.PCollection, org.apache.hadoop.hbase.client.Scan) + */ + public static PCollection<Result> scanHFiles(Pipeline pipeline, Path path, Scan scan, FileSystem fs) { + return scanHFiles(pipeline, ImmutableList.of(path), scan, fs); + } + + /** + * Scans HFiles with filter conditions. + * + * @param pipeline the pipeline + * @param paths paths to HFiles + * @param scan filtering conditions + * @return {@code Result}s + * @see #combineIntoRow(org.apache.crunch.PCollection, org.apache.hadoop.hbase.client.Scan) + */ public static PCollection<Result> scanHFiles(Pipeline pipeline, List<Path> paths, Scan scan) { - PCollection<KeyValue> in = pipeline.read(new HFileSource(paths, scan)); - return combineIntoRow(in, scan); + return scanHFiles(pipeline, paths, scan, null); + } + + /** + * Scans HFiles with filter conditions and source filesystem. + * + * @param pipeline the pipeline + * @param paths paths to HFiles + * @param scan filtering conditions + * @param fs filesystem where HFiles are located + * @return {@code Result}s + * @see #combineIntoRow(org.apache.crunch.PCollection, org.apache.hadoop.hbase.client.Scan) + */ + public static PCollection<Result> scanHFiles(Pipeline pipeline, List<Path> paths, Scan scan, FileSystem fs) { + PCollection<KeyValue> in = pipeline.read(new HFileSource(paths, scan).fileSystem(fs)); + return combineIntoRow(in, scan); } + /** + * Converts a bunch of {@link Cell}s into {@link Result}. + * + * All {@code Cell}s belong to the same row are combined. Deletes are dropped and only + * the latest version is kept. + * + * @param cells the input {@code Cell}s + * @return {@code Result}s + */ public static <C extends Cell> PCollection<Result> combineIntoRow(PCollection<C> cells) { return combineIntoRow(cells, new Scan()); } /** - * Converts a bunch of {@link KeyValue}s into {@link Result}. + * Converts a bunch of {@link Cell}s into {@link Result}. * - * All {@code KeyValue}s belong to the same row are combined. Users may provide some filter - * conditions (specified by {@code scan}). Deletes are dropped and only a specified number - * of versions are kept. + * All {@code Cell}s belong to the same row are combined. Users may provide some filter + * conditions (specified by {@code scan}). Deletes are dropped and only the number + * of versions specified by {@code scan.getMaxVersions()} are kept. * - * @param cells the input {@code KeyValue}s - * @param scan filter conditions, currently we support start row, stop row and family map + * @param cells the input {@code Cell}s + * @param scan filter conditions, currently we support start row, stop row, family map, + * time range, and max versions * @return {@code Result}s */ public static <C extends Cell> PCollection<Result> combineIntoRow(PCollection<C> cells, Scan scan) { @@ -384,6 +451,24 @@ public final class HFileUtils { writeToHFilesForIncrementalLoad(cells, connection, tableName, outputPath, false); } + public static <C extends Cell> void writeToHFilesForIncrementalLoad( + PCollection<C> cells, + Connection connection, + TableName tableName, + Path outputPath, + FileSystem fs) throws IOException { + writeToHFilesForIncrementalLoad(cells, connection, tableName, outputPath, false, fs); + } + + public static <C extends Cell> void writeToHFilesForIncrementalLoad( + PCollection<C> cells, + Connection connection, + TableName tableName, + Path outputPath, + boolean limitToAffectedRegions) throws IOException { + writeToHFilesForIncrementalLoad(cells, connection, tableName, outputPath, limitToAffectedRegions, null); + } + /** * Writes out HFiles from the provided <code>cells</code> and <code>table</code>. <code>limitToAffectedRegions</code> * is used to indicate that the regions the <code>cells</code> will be loaded into should be identified prior to writing @@ -398,7 +483,8 @@ public final class HFileUtils { Connection connection, TableName tableName, Path outputPath, - boolean limitToAffectedRegions) throws IOException { + boolean limitToAffectedRegions, + FileSystem fs) throws IOException { Table table = connection.getTable(tableName); RegionLocator regionLocator = connection.getRegionLocator(tableName); HColumnDescriptor[] families = table.getTableDescriptor().getColumnFamilies(); @@ -416,11 +502,11 @@ public final class HFileUtils { for (HColumnDescriptor f : families) { byte[] family = f.getName(); - HFileTarget hfileTarget = new HFileTarget(new Path(outputPath, Bytes.toString(family)), f); - hfileTarget.outputConf(RegionLocationTable.REGION_LOCATION_TABLE_PATH, regionLocationFilePath.toString()); partitioned .filter(new FilterByFamilyFn<C>(family)) - .write(hfileTarget); + .write(new HFileTarget(new Path(outputPath, Bytes.toString(family)), f) + .outputConf(RegionLocationTable.REGION_LOCATION_TABLE_PATH, regionLocationFilePath.toString()) + .fileSystem(fs)); } } @@ -432,6 +518,24 @@ public final class HFileUtils { writePutsToHFilesForIncrementalLoad(puts, connection, tableName, outputPath, false); } + public static void writePutsToHFilesForIncrementalLoad( + PCollection<Put> puts, + Connection connection, + TableName tableName, + Path outputPath, + FileSystem fs) throws IOException { + writePutsToHFilesForIncrementalLoad(puts, connection, tableName, outputPath, false, fs); + } + + public static void writePutsToHFilesForIncrementalLoad( + PCollection<Put> puts, + Connection connection, + TableName tableName, + Path outputPath, + boolean limitToAffectedRegions) throws IOException { + writePutsToHFilesForIncrementalLoad(puts, connection, tableName, outputPath, limitToAffectedRegions, null); + } + /** * Writes out HFiles from the provided <code>puts</code> and <code>table</code>. <code>limitToAffectedRegions</code> * is used to indicate that the regions the <code>puts</code> will be loaded into should be identified prior to writing @@ -446,7 +550,8 @@ public final class HFileUtils { Connection connection, TableName tableName, Path outputPath, - boolean limitToAffectedRegions) throws IOException { + boolean limitToAffectedRegions, + FileSystem fs) throws IOException { PCollection<Cell> cells = puts.parallelDo("ConvertPutToCells", new DoFn<Put, Cell>() { @Override public void process(Put input, Emitter<Cell> emitter) { @@ -455,7 +560,7 @@ public final class HFileUtils { } } }, HBaseTypes.cells()); - writeToHFilesForIncrementalLoad(cells, connection, tableName, outputPath, limitToAffectedRegions); + writeToHFilesForIncrementalLoad(cells, connection, tableName, outputPath, limitToAffectedRegions, fs); } public static <C extends Cell> PCollection<C> sortAndPartition(PCollection<C> cells, RegionLocator regionLocator) throws IOException {
