Repository: crunch Updated Branches: refs/heads/master 894446d66 -> 2292c7491
changes to support only affected regions during hfile write Signed-off-by: Micah Whitacre <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/2292c749 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/2292c749 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/2292c749 Branch: refs/heads/master Commit: 2292c7491f14a634716126dc161184607eb6b704 Parents: 894446d Author: Stephen Durfey <[email protected]> Authored: Fri Feb 12 17:14:01 2016 -0600 Committer: Micah Whitacre <[email protected]> Committed: Fri Feb 12 17:35:41 2016 -0600 ---------------------------------------------------------------------- .../crunch/lib/sort/TotalOrderPartitioner.java | 8 +- .../apache/crunch/io/hbase/HFileTargetIT.java | 135 +++++++++++++++++- .../org/apache/crunch/io/hbase/HFileUtils.java | 140 ++++++++++++++++++- 3 files changed, 271 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/2292c749/crunch-core/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java b/crunch-core/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java index 94fbdbe..653c294 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java @@ -122,18 +122,18 @@ public class TotalOrderPartitioner<K, V> extends Partitioner<K, V> implements Co /** * Interface to the partitioner to locate a key in the partition keyset. */ - interface Node<T> { + public interface Node<T> { /** * Locate partition in keyset K, st [Ki..Ki+1) defines a partition, * with implicit K0 = -inf, Kn = +inf, and |K| = #partitions - 1. */ int findPartition(T key); } - - class BinarySearchNode implements Node<K> { + + public static class BinarySearchNode<K> implements Node<K> { private final K[] splitPoints; private final RawComparator<K> comparator; - BinarySearchNode(K[] splitPoints, RawComparator<K> comparator) { + public BinarySearchNode(K[] splitPoints, RawComparator<K> comparator) { this.splitPoints = splitPoints; this.comparator = comparator; } http://git-wip-us.apache.org/repos/asf/crunch/blob/2292c749/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java index 5bcc51c..c78ae75 100644 --- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java +++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java @@ -19,6 +19,7 @@ package org.apache.crunch.io.hbase; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.io.Resources; import org.apache.commons.io.IOUtils; @@ -33,15 +34,21 @@ import org.apache.crunch.Pair; import org.apache.crunch.Pipeline; import org.apache.crunch.PipelineResult; import org.apache.crunch.fn.FilterFns; +import org.apache.crunch.impl.dist.DistributedPipeline; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.io.At; +import org.apache.crunch.io.CompositePathIterable; +import org.apache.crunch.io.seq.SeqFileReaderFactory; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.writable.WritableDeepCopier; import org.apache.crunch.types.writable.Writables; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -49,6 +56,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; @@ -63,6 +71,9 @@ import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -76,6 +87,8 @@ import java.io.InputStreamReader; import java.io.OutputStream; import java.io.Serializable; import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; @@ -226,6 +239,7 @@ public class HFileTargetIT implements Serializable { HTable table1 = createTable(26); HTable table2 = createTable(26); LoadIncrementalHFiles loader = new LoadIncrementalHFiles(HBASE_TEST_UTILITY.getConfiguration()); + boolean onlyAffectedRegions = true; PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings())); PCollection<String> words = split(shakespeare, "\\s+"); @@ -240,7 +254,8 @@ public class HFileTargetIT implements Serializable { HFileUtils.writePutsToHFilesForIncrementalLoad( convertToPuts(longWordCounts), table2, - outputPath2); + outputPath2, + onlyAffectedRegions); PipelineResult result = pipeline.run(); assertTrue(result.succeeded()); @@ -298,6 +313,124 @@ public class HFileTargetIT implements Serializable { assertTrue(hfilesCount > 0); } + /** + * @see <a href='https://issues.apache.org/jira/browse/CRUNCH-588'>CRUNCH-588</a> + */ + @Test + public void testOnlyAffectedRegionsWhenWritingHFiles() throws Exception { + Pipeline pipeline = new MRPipeline(HFileTargetIT.class, HBASE_TEST_UTILITY.getConfiguration()); + Path inputPath = copyResourceFileToHDFS("shakes.txt"); + Path outputPath1 = getTempPathOnHDFS("out1"); + HTable table1 = createTable(26); + + PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings())); + PCollection<String> words = split(shakespeare, "\\s+"); + // take the top 5 here to reduce the number of affected regions in the table + PTable<String, Long> count = words.filter(SHORT_WORD_FILTER).count().top(5); + boolean onlyAffectedRegions = true; + + PCollection<Put> wordPuts = convertToPuts(count); + HFileUtils.writePutsToHFilesForIncrementalLoad( + wordPuts, + table1, + outputPath1, + onlyAffectedRegions); + + // locate partition file directory and read it in to verify + // the number of regions to be written to are less than the + // number of regions in the table + String tempPath = ((DistributedPipeline) pipeline).createTempPath().toString(); + Path temp = new Path(tempPath.substring(0, tempPath.lastIndexOf("/"))); + FileSystem fs = FileSystem.get(pipeline.getConfiguration()); + Path partitionPath = null; + + for (final FileStatus fileStatus : fs.listStatus(temp)) { + RemoteIterator<LocatedFileStatus> remoteFIles = fs.listFiles(fileStatus.getPath(), true); + + while(remoteFIles.hasNext()) { + LocatedFileStatus file = remoteFIles.next(); + if(file.getPath().toString().contains("partition")) { + partitionPath = file.getPath(); + System.out.println("found written partitions in path: " + partitionPath.toString()); + break; + } + } + + if(partitionPath != null) { + break; + } + } + + if(partitionPath == null) { + throw new AssertionError("Partition path was not found"); + } + + Class<BytesWritable> keyClass = BytesWritable.class; + List<BytesWritable> writtenPartitions = new ArrayList<>(); + WritableDeepCopier wdc = new WritableDeepCopier(keyClass); + SeqFileReaderFactory<BytesWritable> s = new SeqFileReaderFactory<>(keyClass); + + // read back in the partition file + Iterator<BytesWritable> iter = CompositePathIterable.create(fs, partitionPath, s).iterator(); + while (iter.hasNext()) { + BytesWritable next = iter.next(); + writtenPartitions.add((BytesWritable) wdc.deepCopy(next)); + } + + ImmutableList<byte[]> startKeys = ImmutableList.copyOf(table1.getStartKeys()); + // assert that only affected regions were loaded into + assertTrue(startKeys.size() > writtenPartitions.size()); + + // write out and read back in the start keys for each region. + // do this to get proper byte alignment + Path regionStartKeys = tmpDir.getPath("regionStartKeys"); + List<KeyValue> startKeysToWrite = Lists.newArrayList(); + for (final byte[] startKey : startKeys.subList(1, startKeys.size())) { + startKeysToWrite.add(KeyValueUtil.createFirstOnRow(startKey)); + } + writeToSeqFile(pipeline.getConfiguration(), regionStartKeys, startKeysToWrite); + + List<BytesWritable> writtenStartKeys = new ArrayList<>(); + iter = CompositePathIterable.create(fs, partitionPath, s).iterator(); + while (iter.hasNext()) { + BytesWritable next = iter.next(); + writtenStartKeys.add((BytesWritable) wdc.deepCopy(next)); + } + + // assert the keys read back in match start keys for a region on the table + for (final BytesWritable writtenPartition : writtenPartitions) { + boolean foundMatchingKv = false; + for (final BytesWritable writtenStartKey : writtenStartKeys) { + if (writtenStartKey.equals(writtenPartition)) { + foundMatchingKv = true; + break; + } + } + + if(!foundMatchingKv) { + throw new AssertionError("Written KeyValue: " + writtenPartition + " did not match any known start keys of the table"); + } + } + + pipeline.done(); + } + + private static void writeToSeqFile( + Configuration conf, + Path path, + List<KeyValue> splitPoints) throws IOException { + SequenceFile.Writer writer = SequenceFile.createWriter( + path.getFileSystem(conf), + conf, + path, + NullWritable.class, + BytesWritable.class); + for (KeyValue key : splitPoints) { + writer.append(NullWritable.get(), HBaseTypes.keyValueToBytes(key)); + } + writer.close(); + } + private static PCollection<Put> convertToPuts(PTable<String, Long> in) { return convertToPuts(in, TEST_FAMILY); } http://git-wip-us.apache.org/repos/asf/crunch/blob/2292c749/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java index 31684d2..57fdffb 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java @@ -26,6 +26,7 @@ import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Collections; import java.util.Comparator; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NavigableSet; @@ -36,6 +37,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.primitives.Longs; +import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; import org.apache.crunch.FilterFn; @@ -47,6 +49,7 @@ import org.apache.crunch.Pair; import org.apache.crunch.Pipeline; import org.apache.crunch.impl.dist.DistributedPipeline; import org.apache.crunch.lib.sort.TotalOrderPartitioner; +import org.apache.crunch.types.writable.Writables; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -54,6 +57,7 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; @@ -370,15 +374,32 @@ public final class HFileUtils { } public static <C extends Cell> void writeToHFilesForIncrementalLoad( + PCollection<C> cells, + HTable table, + Path outputPath) throws IOException { + writeToHFilesForIncrementalLoad(cells, table, outputPath, false); + } + + /** + * Writes out HFiles from the provided <code>cells</code> and <code>table</code>. <code>limitToAffectedRegions</code> + * is used to indicate that the regions the <code>cells</code> will be loaded into should be identified prior to writing + * HFiles. Identifying the regions ahead of time will reduce the number of reducers needed when writing. This is + * beneficial if the data to be loaded only touches a small enough subset of the total regions in the table. If set to + * false, the number of reducers will equal the number of regions in the table. + * + * @see <a href='https://issues.apache.org/jira/browse/CRUNCH-588'>CRUNCH-588</a> + */ + public static <C extends Cell> void writeToHFilesForIncrementalLoad( PCollection<C> cells, HTable table, - Path outputPath) throws IOException { + Path outputPath, + boolean limitToAffectedRegions) throws IOException { HColumnDescriptor[] families = table.getTableDescriptor().getColumnFamilies(); if (families.length == 0) { LOG.warn("{} has no column families", table); return; } - PCollection<C> partitioned = sortAndPartition(cells, table); + PCollection<C> partitioned = sortAndPartition(cells, table, limitToAffectedRegions); for (HColumnDescriptor f : families) { byte[] family = f.getName(); partitioned @@ -388,9 +409,26 @@ public final class HFileUtils { } public static void writePutsToHFilesForIncrementalLoad( + PCollection<Put> puts, + HTable table, + Path outputPath) throws IOException { + writePutsToHFilesForIncrementalLoad(puts, table, outputPath, false); + } + + /** + * Writes out HFiles from the provided <code>puts</code> and <code>table</code>. <code>limitToAffectedRegions</code> + * is used to indicate that the regions the <code>puts</code> will be loaded into should be identified prior to writing + * HFiles. Identifying the regions ahead of time will reduce the number of reducers needed when writing. This is + * beneficial if the data to be loaded only touches a small enough subset of the total regions in the table. If set to + * false, the number of reducers will equal the number of regions in the table. + * + * @see <a href='https://issues.apache.org/jira/browse/CRUNCH-588'>CRUNCH-588</a> + */ + public static void writePutsToHFilesForIncrementalLoad( PCollection<Put> puts, HTable table, - Path outputPath) throws IOException { + Path outputPath, + boolean limitToAffectedRegions) throws IOException { PCollection<Cell> cells = puts.parallelDo("ConvertPutToCells", new DoFn<Put, Cell>() { @Override public void process(Put input, Emitter<Cell> emitter) { @@ -399,10 +437,21 @@ public final class HFileUtils { } } }, HBaseTypes.cells()); - writeToHFilesForIncrementalLoad(cells, table, outputPath); + writeToHFilesForIncrementalLoad(cells, table, outputPath, limitToAffectedRegions); } public static <C extends Cell> PCollection<C> sortAndPartition(PCollection<C> cells, HTable table) throws IOException { + return sortAndPartition(cells, table, false); + } + + /** + * Sorts and partitions the provided <code>cells</code> for the given <code>table</code> to ensure all elements that belong + * in the same region end up in the same reducer. The flag <code>limitToAffectedRegions</code>, when set to true, will identify + * the regions the data in <code>cells</code> belongs to and will set the number of reducers equal to the number of identified + * affected regions. If set to false, then all regions will be used, and the number of reducers will be set to the number + * of regions in the table. + */ + public static <C extends Cell> PCollection<C> sortAndPartition(PCollection<C> cells, HTable table, boolean limitToAffectedRegions) throws IOException { Configuration conf = cells.getPipeline().getConfiguration(); PTable<C, Void> t = cells.parallelDo( "Pre-partition", @@ -412,7 +461,13 @@ public final class HFileUtils { return Pair.of(input, (Void) null); } }, tableOf(cells.getPType(), nulls())); - List<KeyValue> splitPoints = getSplitPoints(table); + + List<KeyValue> splitPoints; + if(limitToAffectedRegions) { + splitPoints = getSplitPoints(table, t); + } else { + splitPoints = getSplitPoints(table); + } Path partitionFile = new Path(((DistributedPipeline) cells.getPipeline()).createTempPath(), "partition"); writePartitionInfo(conf, partitionFile, splitPoints); GroupingOptions options = GroupingOptions.builder() @@ -431,13 +486,84 @@ public final class HFileUtils { } List<KeyValue> splitPoints = Lists.newArrayList(); for (byte[] startKey : startKeys.subList(1, startKeys.size())) { - KeyValue kv = KeyValue.createFirstOnRow(startKey); - LOG.debug("split row: " + Bytes.toString(kv.getRow())); + KeyValue kv = KeyValueUtil.createFirstOnRow(startKey); + LOG.debug("split row: " + Bytes.toString(CellUtil.cloneRow(kv))); splitPoints.add(kv); } return splitPoints; } + private static <C> List<KeyValue> getSplitPoints(HTable table, PTable<C, Void> affectedRows) throws IOException { + List<byte[]> startKeys; + try { + startKeys = Lists.newArrayList(table.getStartKeys()); + if (startKeys.isEmpty()) { + throw new AssertionError(table + " has no regions!"); + } + } catch (IOException e) { + throw new CrunchRuntimeException(e); + } + + Collections.sort(startKeys, Bytes.BYTES_COMPARATOR); + + Iterable<ByteBuffer> bufferedStartKeys = affectedRows + .parallelDo(new DetermineAffectedRegionsFn(startKeys), Writables.bytes()).materialize(); + + // set to get rid of the potential duplicate start keys emitted + ImmutableSet.Builder<KeyValue> startKeyBldr = ImmutableSet.builder(); + for (final ByteBuffer bufferedStartKey : bufferedStartKeys) { + startKeyBldr.add(KeyValueUtil.createFirstOnRow(bufferedStartKey.array())); + } + + return ImmutableList.copyOf(startKeyBldr.build()); + } + + /** + * Spins through the {@link Cell}s and determines which regions the data + * will be loaded into. Searching the regions is done via a binary search. The + * region start key should be provided by the caller to cut down on calls to + * HMaster to get those start keys. + */ + public static class DetermineAffectedRegionsFn<C extends Cell> extends DoFn<Pair<C, Void>, ByteBuffer> { + + private final Set<Cell> startKeysToEmit = new HashSet<>(); + List<byte[]> startKeys; + TotalOrderPartitioner.Node partitions; + List<Cell> regionStartKeys = Lists.newArrayList(); + + public DetermineAffectedRegionsFn(List<byte[]> startKeys) { + this.startKeys = startKeys; + } + + @Override + public void initialize() { + for (byte[] startKey : startKeys.subList(1, startKeys.size())) { + Cell cell = KeyValueUtil.createFirstOnRow(startKey); + regionStartKeys.add(cell); + } + + partitions = new TotalOrderPartitioner.BinarySearchNode<>(regionStartKeys.toArray(new Cell[regionStartKeys.size()]), + new KeyValue.KVComparator()); + } + + @Override + public void process(Pair<C, Void> input, Emitter<ByteBuffer> emitter) { + int position = partitions.findPartition(new KeyValue(input.first().getFamilyArray())); + // if the position is after the last key, use the last start key + // as the split for this key, since it should fall into that region + if (position >= regionStartKeys.size() && regionStartKeys.size() > 1) { + position = regionStartKeys.size() - 1; + } + + Cell foundCell = regionStartKeys.get(position); + + if (!startKeysToEmit.contains(foundCell)) { + startKeysToEmit.add(foundCell); + emitter.emit(ByteBuffer.wrap(CellUtil.cloneRow(foundCell))); + } + } + } + private static void writePartitionInfo( Configuration conf, Path path,
