Repository: crunch Updated Branches: refs/heads/master 6c8c5ffd0 -> 5bd258d1c
CRUNCH-475: Update HBase to 1.0.0 and Hadoop to 2.5.2 Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/5bd258d1 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/5bd258d1 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/5bd258d1 Branch: refs/heads/master Commit: 5bd258d1c0244fdb2c8a6d83b766fde08bb33354 Parents: 6c8c5ff Author: Josh Wills <[email protected]> Authored: Tue Jan 13 16:13:25 2015 -0800 Committer: Josh Wills <[email protected]> Committed: Sat May 23 14:12:04 2015 -0700 ---------------------------------------------------------------------- crunch-hbase/pom.xml | 78 ++++--------- .../apache/crunch/io/hbase/HFileTargetIT.java | 17 ++- .../org/apache/crunch/io/hbase/HBaseTypes.java | 33 +++++- .../crunch/io/hbase/HFileInputFormat.java | 9 +- .../io/hbase/HFileOutputFormatForCrunch.java | 22 ++-- .../crunch/io/hbase/HFileReaderFactory.java | 6 +- .../org/apache/crunch/io/hbase/HFileTarget.java | 10 +- .../org/apache/crunch/io/hbase/HFileUtils.java | 115 +++++++++---------- crunch-spark/pom.xml | 4 + pom.xml | 11 +- 10 files changed, 153 insertions(+), 152 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/5bd258d1/crunch-hbase/pom.xml ---------------------------------------------------------------------- diff --git a/crunch-hbase/pom.xml b/crunch-hbase/pom.xml index d1a33f5..5956faf 100644 --- a/crunch-hbase/pom.xml +++ b/crunch-hbase/pom.xml @@ -88,12 +88,6 @@ under the License. <dependency> <groupId>org.apache.hbase</groupId> - <artifactId>hbase-${hbase.midfix}-compat</artifactId> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.hbase</groupId> <artifactId>hbase-testing-util</artifactId> <scope>provided</scope> </dependency> @@ -128,55 +122,33 @@ under the License. <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-jobclient</artifactId> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-annotations</artifactId> + </dependency> </dependencies> - <profiles> - <profile> - <id>hadoop-1</id> - <activation> - <property> - <name>!crunch.platform</name> - </property> - </activation> - </profile> - <profile> - <id>hadoop-2</id> - <activation> - <property> - <name>crunch.platform</name> - <value>2</value> - </property> - </activation> - <dependencies> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-auth</artifactId> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-jobclient</artifactId> - <type>test-jar</type> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-annotations</artifactId> - </dependency> - </dependencies> - </profile> - </profiles> - <build> <plugins> <plugin> http://git-wip-us.apache.org/repos/asf/crunch/blob/5bd258d1/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java index 71cf31f..ddb1292 100644 --- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java +++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java @@ -17,6 +17,7 @@ */ package org.apache.crunch.io.hbase; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.io.Resources; @@ -41,11 +42,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; @@ -316,7 +320,8 @@ public class HFileTargetIT implements Serializable { w = "__EMPTY__"; } long c = input.second(); - return Pair.of(new KeyValue(Bytes.toBytes(w), TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(c)), null); + Cell cell = CellUtil.createCell(Bytes.toBytes(w), Bytes.toBytes(c)); + return Pair.of(KeyValue.cloneAndAddTags(cell, ImmutableList.<Tag>of()), null); } }, tableOf(HBaseTypes.keyValues(), nulls())) .groupByKey(GroupingOptions.builder() @@ -359,9 +364,9 @@ public class HFileTargetIT implements Serializable { KeyValueScanner kvh = new KeyValueHeap(scanners, KeyValue.COMPARATOR); boolean seekOk = kvh.seek(fakeKV); assertTrue(seekOk); - KeyValue kv = kvh.next(); + Cell kv = kvh.next(); kvh.close(); - return kv; + return KeyValue.cloneAndAddTags(kv, ImmutableList.<Tag>of()); } private static Path copyResourceFileToHDFS(String resourceName) throws IOException { @@ -390,11 +395,11 @@ public class HFileTargetIT implements Serializable { private static long getWordCountFromTable(HTable table, String word) throws IOException { Get get = new Get(Bytes.toBytes(word)); - KeyValue keyValue = table.get(get).getColumnLatest(TEST_FAMILY, TEST_QUALIFIER); - if (keyValue == null) { + byte[] value = table.get(get).value(); + if (value == null) { fail("no such row: " + word); } - return Bytes.toLong(keyValue.getValue()); + return Bytes.toLong(value); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/5bd258d1/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTypes.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTypes.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTypes.java index f8a259d..787b9c6 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTypes.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTypes.java @@ -19,15 +19,17 @@ */ package org.apache.crunch.io.hbase; +import com.google.common.collect.ImmutableList; import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.MapFn; import org.apache.crunch.types.PType; import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.mapreduce.KeyValueSerialization; import org.apache.hadoop.hbase.mapreduce.MutationSerialization; import org.apache.hadoop.hbase.mapreduce.ResultSerialization; import org.apache.hadoop.io.BytesWritable; @@ -83,15 +85,36 @@ public final class HBaseTypes { Writables.writables(BytesWritable.class)); } - public static BytesWritable keyValueToBytes(KeyValue input) { + public static final PType<Cell> cells() { + return Writables.derived(Cell.class, + new MapFn<BytesWritable, Cell>() { + @Override + public Cell map(BytesWritable input) { + return bytesToKeyValue(input); + } + }, + new MapFn<Cell, BytesWritable>() { + @Override + public BytesWritable map(Cell input) { + return keyValueToBytes(input); + } + }, + Writables.writables(BytesWritable.class)); + } + + public static BytesWritable keyValueToBytes(Cell input) { + return keyValueToBytes(KeyValue.cloneAndAddTags(input, ImmutableList.<Tag>of())); + } + + public static BytesWritable keyValueToBytes(KeyValue kv) { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos); try { - KeyValue.write(input, dos); - } catch (IOException e) { + KeyValue.write(kv, dos); + return new BytesWritable(baos.toByteArray()); + } catch (Exception e) { throw new CrunchRuntimeException(e); } - return new BytesWritable(baos.toByteArray()); } public static KeyValue bytesToKeyValue(BytesWritable input) { http://git-wip-us.apache.org/repos/asf/crunch/blob/5bd258d1/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java index ca886f6..1d8e106 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java @@ -17,6 +17,7 @@ */ package org.apache.crunch.io.hbase; +import com.google.common.collect.ImmutableList; import org.apache.commons.codec.DecoderException; import org.apache.commons.codec.binary.Hex; import org.apache.hadoop.conf.Configuration; @@ -24,7 +25,9 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile.Reader; @@ -140,10 +143,8 @@ public class HFileInputFormat extends FileInputFormat<NullWritable, KeyValue> { if (!hasNext) { return false; } - value = scanner.getKeyValue(); - if (stopRow != null && Bytes.compareTo( - value.getBuffer(), value.getRowOffset(), value.getRowLength(), - stopRow, 0, stopRow.length) >= 0) { + value = KeyValue.cloneAndAddTags(scanner.getKeyValue(), ImmutableList.<Tag>of()); + if (stopRow != null && Bytes.compareTo(CellUtil.cloneRow(value), stopRow) >= 0) { if(LOG.isInfoEnabled()) { LOG.info("Reached stop row {}", Bytes.toStringBinary(stopRow)); } http://git-wip-us.apache.org/repos/asf/crunch/blob/5bd258d1/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java index 0c64e5e..7611235 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java @@ -19,14 +19,17 @@ */ package org.apache.crunch.io.hbase; +import com.google.common.collect.ImmutableList; import org.apache.commons.codec.DecoderException; import org.apache.commons.codec.binary.Hex; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.regionserver.StoreFile; @@ -44,7 +47,7 @@ import java.io.DataInputStream; import java.io.IOException; /** - * This is a thin wrapper of {@link HFile.Writer}. It only calls {@link HFile.Writer#append(byte[], byte[])} + * This is a thin wrapper of {@link HFile.Writer}. It only calls {@link HFile.Writer#append} * when records are emitted. It only supports writing data into a single column family. Records MUST be sorted * by their column qualifier, then timestamp reversely. All data are written into a single HFile. * @@ -53,7 +56,7 @@ import java.io.IOException; * As crunch supports more complex and flexible MapReduce pipeline, we would prefer thin and pure * {@code OutputFormat} here. */ -public class HFileOutputFormatForCrunch extends FileOutputFormat<Object, KeyValue> { +public class HFileOutputFormatForCrunch extends FileOutputFormat<Object, Cell> { public static final String HCOLUMN_DESCRIPTOR_KEY = "hbase.hfileoutputformat.column.descriptor"; private static final String COMPACTION_EXCLUDE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.compaction.exclude"; @@ -63,7 +66,7 @@ public class HFileOutputFormatForCrunch extends FileOutputFormat<Object, KeyValu private final TimeRangeTracker trt = new TimeRangeTracker(); @Override - public RecordWriter<Object, KeyValue> getRecordWriter(final TaskAttemptContext context) + public RecordWriter<Object, Cell> getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException { Path outputPath = getDefaultWorkFile(context, ""); Configuration conf = context.getConfiguration(); @@ -92,15 +95,16 @@ public class HFileOutputFormatForCrunch extends FileOutputFormat<Object, KeyValu .withFileContext(getContext(hcol)) .create(); - return new RecordWriter<Object, KeyValue>() { + return new RecordWriter<Object, Cell>() { @Override - public void write(Object row, KeyValue kv) + public void write(Object row, Cell cell) throws IOException { - if (kv.getTimestamp() == HConstants.LATEST_TIMESTAMP) { - kv.updateLatestStamp(now); + KeyValue copy = KeyValue.cloneAndAddTags(cell, ImmutableList.<Tag>of()); + if (copy.getTimestamp() == HConstants.LATEST_TIMESTAMP) { + copy.updateLatestStamp(now); } - writer.append(kv); - trt.includeTimestamp(kv); + writer.append(copy); + trt.includeTimestamp(copy); } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/5bd258d1/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReaderFactory.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReaderFactory.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReaderFactory.java index 6189775..14e6118 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReaderFactory.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReaderFactory.java @@ -17,11 +17,13 @@ */ package org.apache.crunch.io.hbase; +import com.google.common.collect.ImmutableList; import org.apache.crunch.io.FileReaderFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileScanner; @@ -57,7 +59,7 @@ public class HFileReaderFactory implements FileReaderFactory<KeyValue> { public HFileIterator(HFileScanner scanner) { this.scanner = scanner; - this.curr = scanner.getKeyValue(); + this.curr = KeyValue.cloneAndAddTags(scanner.getKeyValue(), ImmutableList.<Tag>of()); } @Override @@ -70,7 +72,7 @@ public class HFileReaderFactory implements FileReaderFactory<KeyValue> { KeyValue ret = curr; try { if (scanner.next()) { - curr = scanner.getKeyValue(); + curr = KeyValue.cloneAndAddTags(scanner.getKeyValue(), ImmutableList.<Tag>of()); } else { curr = null; } http://git-wip-us.apache.org/repos/asf/crunch/blob/5bd258d1/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java index d9bbf7f..41d56ff 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java @@ -26,9 +26,9 @@ import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.KeyValueSerialization; import org.apache.hadoop.io.WritableUtils; @@ -68,13 +68,13 @@ public class HFileTarget extends FileTargetImpl { if (ptype instanceof PTableType) { valueType = ((PTableType) ptype).getValueType(); } - if (!KeyValue.class.equals(valueType.getTypeClass())) { - throw new IllegalArgumentException("HFileTarget only supports KeyValue outputs"); + if (!Cell.class.isAssignableFrom(valueType.getTypeClass())) { + throw new IllegalArgumentException("HFileTarget only supports Cell outputs"); } if (ptype instanceof PTableType) { - return new HBasePairConverter<ImmutableBytesWritable, KeyValue>(ImmutableBytesWritable.class, KeyValue.class); + return new HBasePairConverter<ImmutableBytesWritable, Cell>(ImmutableBytesWritable.class, Cell.class); } - return new HBaseValueConverter<KeyValue>(KeyValue.class); + return new HBaseValueConverter<Cell>(Cell.class); } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/5bd258d1/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java index 252bad7..34118ca 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java @@ -24,7 +24,6 @@ import static org.apache.crunch.types.writable.Writables.tableOf; import java.io.IOException; import java.io.Serializable; import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -34,6 +33,7 @@ import java.util.Set; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.primitives.Longs; import org.apache.crunch.DoFn; @@ -49,9 +49,12 @@ import org.apache.crunch.impl.dist.DistributedPipeline; import org.apache.crunch.lib.sort.TotalOrderPartitioner; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; @@ -113,7 +116,7 @@ public final class HFileUtils { }; - private static class FilterByFamilyFn extends FilterFn<KeyValue> { + private static class FilterByFamilyFn<C extends Cell> extends FilterFn<C> { private final byte[] family; @@ -122,14 +125,12 @@ public final class HFileUtils { } @Override - public boolean accept(KeyValue input) { - return Bytes.equals( - input.getBuffer(), input.getFamilyOffset(), input.getFamilyLength(), - family, 0, family.length); + public boolean accept(C input) { + return Bytes.equals(CellUtil.cloneFamily(input), family); } } - private static class StartRowFilterFn extends FilterFn<KeyValue> { + private static class StartRowFilterFn<C extends Cell> extends FilterFn<C> { private final byte[] startRow; @@ -138,12 +139,12 @@ public final class HFileUtils { } @Override - public boolean accept(KeyValue input) { - return Bytes.compareTo(input.getRow(), startRow) >= 0; + public boolean accept(C input) { + return Bytes.compareTo(CellUtil.cloneRow(input), startRow) >= 0; } } - private static class StopRowFilterFn extends FilterFn<KeyValue> { + private static class StopRowFilterFn<C extends Cell> extends FilterFn<C> { private final byte[] stopRow; @@ -152,12 +153,12 @@ public final class HFileUtils { } @Override - public boolean accept(KeyValue input) { - return Bytes.compareTo(input.getRow(), stopRow) < 0; + public boolean accept(C input) { + return Bytes.compareTo(CellUtil.cloneRow(input), stopRow) < 0; } } - private static class FamilyMapFilterFn extends FilterFn<KeyValue> { + private static class FamilyMapFilterFn<C extends Cell> extends FilterFn<C> { private static class Column implements Serializable { @@ -216,15 +217,14 @@ public final class HFileUtils { } @Override - public boolean accept(KeyValue input) { - byte[] b = input.getBuffer(); - ByteBuffer f = ByteBuffer.wrap(b, input.getFamilyOffset(), input.getFamilyLength()); - ByteBuffer q = ByteBuffer.wrap(b, input.getQualifierOffset(), input.getQualifierLength()); + public boolean accept(C input) { + ByteBuffer f = ByteBuffer.wrap(CellUtil.cloneFamily(input)); + ByteBuffer q = ByteBuffer.wrap(CellUtil.cloneQualifier(input)); return familySet.contains(f) || qualifierSet.contains(Pair.of(f, q)); } } - private static class TimeRangeFilterFn extends FilterFn<KeyValue> { + private static class TimeRangeFilterFn<C extends Cell> extends FilterFn<C> { private final long minTimestamp; private final long maxTimestamp; @@ -236,7 +236,7 @@ public final class HFileUtils { } @Override - public boolean accept(KeyValue input) { + public boolean accept(C input) { return (minTimestamp <= input.getTimestamp() && input.getTimestamp() < maxTimestamp); } } @@ -253,8 +253,8 @@ public final class HFileUtils { if (rlength < 4) { throw new AssertionError("Too small rlength: " + rlength); } - KeyValue leftKey = HBaseTypes.bytesToKeyValue(left, loffset + 4, llength - 4); - KeyValue rightKey = HBaseTypes.bytesToKeyValue(right, roffset + 4, rlength - 4); + Cell leftKey = HBaseTypes.bytesToKeyValue(left, loffset + 4, llength - 4); + Cell rightKey = HBaseTypes.bytesToKeyValue(right, roffset + 4, rlength - 4); byte[] lRow = leftKey.getRow(); byte[] rRow = rightKey.getRow(); @@ -274,14 +274,13 @@ public final class HFileUtils { } } - private static final MapFn<KeyValue, ByteBuffer> EXTRACT_ROW_FN = new MapFn<KeyValue, ByteBuffer>() { + private static class ExtractRowFn<C extends Cell> extends MapFn<C, ByteBuffer> { @Override - public ByteBuffer map(KeyValue input) { + public ByteBuffer map(Cell input) { // we have to make a copy of row, because the buffer may be changed after this call - return ByteBuffer.wrap(Arrays.copyOfRange( - input.getBuffer(), input.getRowOffset(), input.getRowOffset() + input.getRowLength())); + return ByteBuffer.wrap(CellUtil.cloneRow(input)); } - }; + } public static PCollection<Result> scanHFiles(Pipeline pipeline, Path path) { return scanHFiles(pipeline, path, new Scan()); @@ -305,8 +304,8 @@ public final class HFileUtils { return combineIntoRow(in, scan); } - public static PCollection<Result> combineIntoRow(PCollection<KeyValue> kvs) { - return combineIntoRow(kvs, new Scan()); + public static <C extends Cell> PCollection<Result> combineIntoRow(PCollection<C> cells) { + return combineIntoRow(cells, new Scan()); } /** @@ -316,41 +315,41 @@ public final class HFileUtils { * conditions (specified by {@code scan}). Deletes are dropped and only a specified number * of versions are kept. * - * @param kvs the input {@code KeyValue}s + * @param cells the input {@code KeyValue}s * @param scan filter conditions, currently we support start row, stop row and family map * @return {@code Result}s */ - public static PCollection<Result> combineIntoRow(PCollection<KeyValue> kvs, Scan scan) { + public static <C extends Cell> PCollection<Result> combineIntoRow(PCollection<C> cells, Scan scan) { if (!Bytes.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW)) { - kvs = kvs.filter(new StartRowFilterFn(scan.getStartRow())); + cells = cells.filter(new StartRowFilterFn<C>(scan.getStartRow())); } if (!Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { - kvs = kvs.filter(new StopRowFilterFn(scan.getStopRow())); + cells = cells.filter(new StopRowFilterFn<C>(scan.getStopRow())); } if (scan.hasFamilies()) { - kvs = kvs.filter(new FamilyMapFilterFn(scan.getFamilyMap())); + cells = cells.filter(new FamilyMapFilterFn<C>(scan.getFamilyMap())); } TimeRange timeRange = scan.getTimeRange(); if (timeRange != null && (timeRange.getMin() > 0 || timeRange.getMax() < Long.MAX_VALUE)) { - kvs = kvs.filter(new TimeRangeFilterFn(timeRange)); + cells = cells.filter(new TimeRangeFilterFn<C>(timeRange)); } // TODO(chaoshi): support Scan#getFilter - PTable<ByteBuffer, KeyValue> kvsByRow = kvs.by(EXTRACT_ROW_FN, bytes()); + PTable<ByteBuffer, C> cellsByRow = cells.by(new ExtractRowFn<C>(), bytes()); final int versions = scan.getMaxVersions(); - return kvsByRow.groupByKey().parallelDo("CombineKeyValueIntoRow", - new DoFn<Pair<ByteBuffer, Iterable<KeyValue>>, Result>() { + return cellsByRow.groupByKey().parallelDo("CombineKeyValueIntoRow", + new DoFn<Pair<ByteBuffer, Iterable<C>>, Result>() { @Override - public void process(Pair<ByteBuffer, Iterable<KeyValue>> input, Emitter<Result> emitter) { - List<KeyValue> kvs = Lists.newArrayList(); - for (KeyValue kv : input.second()) { + public void process(Pair<ByteBuffer, Iterable<C>> input, Emitter<Result> emitter) { + List<KeyValue> cells = Lists.newArrayList(); + for (Cell kv : input.second()) { try { - kvs.add(kv.clone()); // assuming the input fits into memory + cells.add(KeyValue.cloneAndAddTags(kv, ImmutableList.<Tag>of())); // assuming the input fits into memory } catch (Exception e) { throw new RuntimeException(e); } } - Result result = doCombineIntoRow(kvs, versions); + Result result = doCombineIntoRow(cells, versions); if (result == null) { return; } @@ -359,8 +358,8 @@ public final class HFileUtils { }, HBaseTypes.results()); } - public static void writeToHFilesForIncrementalLoad( - PCollection<KeyValue> kvs, + public static <C extends Cell> void writeToHFilesForIncrementalLoad( + PCollection<C> cells, HTable table, Path outputPath) throws IOException { HColumnDescriptor[] families = table.getTableDescriptor().getColumnFamilies(); @@ -370,7 +369,7 @@ public final class HFileUtils { } for (HColumnDescriptor f : families) { byte[] family = f.getName(); - PCollection<KeyValue> sorted = sortAndPartition(kvs.filter(new FilterByFamilyFn(family)), table); + PCollection<C> sorted = sortAndPartition(cells.filter(new FilterByFamilyFn<C>(family)), table); sorted.write(new HFileTarget(new Path(outputPath, Bytes.toString(family)), f)); } } @@ -379,29 +378,27 @@ public final class HFileUtils { PCollection<Put> puts, HTable table, Path outputPath) throws IOException { - PCollection<KeyValue> kvs = puts.parallelDo("ConvertPutToKeyValue", new DoFn<Put, KeyValue>() { + PCollection<Cell> cells = puts.parallelDo("ConvertPutToCells", new DoFn<Put, Cell>() { @Override - public void process(Put input, Emitter<KeyValue> emitter) { - for (List<KeyValue> keyValues : input.getFamilyMap().values()) { - for (KeyValue keyValue : keyValues) { - emitter.emit(keyValue); - } + public void process(Put input, Emitter<Cell> emitter) { + for (Cell cell : Iterables.concat(input.getFamilyCellMap().values())) { + emitter.emit(cell); } } - }, HBaseTypes.keyValues()); - writeToHFilesForIncrementalLoad(kvs, table, outputPath); + }, HBaseTypes.cells()); + writeToHFilesForIncrementalLoad(cells, table, outputPath); } - public static PCollection<KeyValue> sortAndPartition(PCollection<KeyValue> kvs, HTable table) throws IOException { - Configuration conf = kvs.getPipeline().getConfiguration(); - PTable<KeyValue, Void> t = kvs.parallelDo(new MapFn<KeyValue, Pair<KeyValue, Void>>() { + public static <C extends Cell> PCollection<C> sortAndPartition(PCollection<C> cells, HTable table) throws IOException { + Configuration conf = cells.getPipeline().getConfiguration(); + PTable<C, Void> t = cells.parallelDo(new MapFn<C, Pair<C, Void>>() { @Override - public Pair<KeyValue, Void> map(KeyValue input) { + public Pair<C, Void> map(C input) { return Pair.of(input, (Void) null); } - }, tableOf(HBaseTypes.keyValues(), nulls())); + }, tableOf(cells.getPType(), nulls())); List<KeyValue> splitPoints = getSplitPoints(table); - Path partitionFile = new Path(((DistributedPipeline) kvs.getPipeline()).createTempPath(), "partition"); + Path partitionFile = new Path(((DistributedPipeline) cells.getPipeline()).createTempPath(), "partition"); writePartitionInfo(conf, partitionFile, splitPoints); GroupingOptions options = GroupingOptions.builder() .partitionerClass(TotalOrderPartitioner.class) http://git-wip-us.apache.org/repos/asf/crunch/blob/5bd258d1/crunch-spark/pom.xml ---------------------------------------------------------------------- diff --git a/crunch-spark/pom.xml b/crunch-spark/pom.xml index 7757daf..690256a 100644 --- a/crunch-spark/pom.xml +++ b/crunch-spark/pom.xml @@ -52,6 +52,10 @@ under the License. <scope>provided</scope> <exclusions> <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + <exclusion> <groupId>com.sun.jersey</groupId> <artifactId>jersey-server</artifactId> </exclusion> http://git-wip-us.apache.org/repos/asf/crunch/blob/5bd258d1/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index cce69bf..2eed8bc 100644 --- a/pom.xml +++ b/pom.xml @@ -89,10 +89,9 @@ under the License. <pkg>org.apache.crunch</pkg> <!-- Can be overridden by hadoop-2 profile, but these are the default values --> - <hadoop.version>2.2.0</hadoop.version> - <hbase.version>0.98.1-hadoop2</hbase.version> + <hadoop.version>2.5.2</hadoop.version> + <hbase.version>1.0.0</hbase.version> <commons-lang.version>2.5</commons-lang.version> - <hbase.midfix>hadoop2</hbase.midfix> <avro.classifier>hadoop2</avro.classifier> <scala.base.version>2.10</scala.base.version> @@ -325,12 +324,6 @@ under the License. <dependency> <groupId>org.apache.hbase</groupId> - <artifactId>hbase-${hbase.midfix}-compat</artifactId> - <version>${hbase.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.hbase</groupId> <artifactId>hbase-testing-util</artifactId> <version>${hbase.version}</version> </dependency>
