CRUNCH-41: Move HBase support into a sub-module Move HBase code and test case to the new Maven module crunch-hbase. Move HBase static factories to FromHBase, ToHBase, and AtHBase. Remove unnecessary dependencies from crunch core.
Signed-off-by: jwills <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/a8691d0b Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/a8691d0b Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/a8691d0b Branch: refs/heads/master Commit: a8691d0bd193a5c514e8318d4aa927eee643e531 Parents: 7ab0da9 Author: Matthias Friedrich <[email protected]> Authored: Sun Aug 12 19:41:33 2012 +0200 Committer: jwills <[email protected]> Committed: Mon Aug 13 15:51:57 2012 -0700 ---------------------------------------------------------------------- crunch-hbase/pom.xml | 146 +++++++++ .../apache/crunch/io/hbase/WordCountHBaseIT.java | 229 +++++++++++++++ .../org/apache/crunch/test/TemporaryPaths.java | 40 +++ .../java/org/apache/crunch/io/hbase/AtHBase.java | 37 +++ .../java/org/apache/crunch/io/hbase/FromHBase.java | 39 +++ .../apache/crunch/io/hbase/HBaseSourceTarget.java | 108 +++++++ .../org/apache/crunch/io/hbase/HBaseTarget.java | 93 ++++++ .../java/org/apache/crunch/io/hbase/ToHBase.java | 31 ++ crunch/pom.xml | 41 +-- .../java/org/apache/crunch/WordCountHBaseIT.java | 223 -------------- crunch/src/main/java/org/apache/crunch/io/At.java | 10 - .../src/main/java/org/apache/crunch/io/From.java | 12 - crunch/src/main/java/org/apache/crunch/io/To.java | 5 - .../apache/crunch/io/hbase/HBaseSourceTarget.java | 108 ------- .../org/apache/crunch/io/hbase/HBaseTarget.java | 93 ------ pom.xml | 33 ++- 16 files changed, 767 insertions(+), 481 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a8691d0b/crunch-hbase/pom.xml ---------------------------------------------------------------------- diff --git a/crunch-hbase/pom.xml b/crunch-hbase/pom.xml new file mode 100644 index 0000000..9ed06d0 --- /dev/null +++ b/crunch-hbase/pom.xml @@ -0,0 +1,146 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.crunch</groupId> + <artifactId>crunch-parent</artifactId> + <version>0.3.0-incubating-SNAPSHOT</version> + </parent> + + <artifactId>crunch-hbase</artifactId> + <name>Apache Crunch HBase Support</name> + + <dependencies> + <dependency> + <groupId>org.apache.crunch</groupId> + <artifactId>crunch</artifactId> + </dependency> + + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.crunch</groupId> + <artifactId>crunch-test</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <phase>test-compile</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/lib</outputDirectory> + <excludeArtifactIds>crunch-test</excludeArtifactIds> + </configuration> + </execution> + </executions> + </plugin> + <!-- We put slow-running tests into src/it and run them during the + integration-test phase using the failsafe plugin. This way + developers can run unit tests conveniently from the IDE or via + "mvn package" from the command line without triggering time + consuming integration tests. --> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <executions> + <execution> + <id>add-test-source</id> + <phase>validate</phase> + <goals> + <goal>add-test-source</goal> + </goals> + <configuration> + <sources> + <source>${basedir}/src/it/java</source> + </sources> + </configuration> + </execution> + <execution> + <id>add-test-resource</id> + <phase>validate</phase> + <goals> + <goal>add-test-resource</goal> + </goals> + <configuration> + <resources> + <resource> + <directory>${basedir}/src/it/resources</directory> + </resource> + </resources> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + <configuration> + <testSourceDirectory>${basedir}/src/it/java</testSourceDirectory> + </configuration> + <executions> + <execution> + <goals> + <goal>integration-test</goal> + <goal>verify</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a8691d0b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java new file mode 100644 index 0000000..f13edeb --- /dev/null +++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.hbase; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Random; +import java.util.jar.JarEntry; +import java.util.jar.JarOutputStream; + +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.hbase.HBaseSourceTarget; +import org.apache.crunch.io.hbase.HBaseTarget; +import org.apache.crunch.lib.Aggregate; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.TaskAttemptContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import com.google.common.io.ByteStreams; + +public class WordCountHBaseIT { + @Rule + public TemporaryPath tmpDir = TemporaryPaths.create(); + + private static final byte[] COUNTS_COLFAM = Bytes.toBytes("cf"); + private static final byte[] WORD_COLFAM = Bytes.toBytes("cf"); + + private HBaseTestingUtility hbaseTestUtil = new HBaseTestingUtility(); + + @SuppressWarnings("serial") + public static PCollection<Put> wordCount(PTable<ImmutableBytesWritable, Result> words) { + PTable<String, Long> counts = Aggregate.count(words.parallelDo( + new DoFn<Pair<ImmutableBytesWritable, Result>, String>() { + @Override + public void process(Pair<ImmutableBytesWritable, Result> row, Emitter<String> emitter) { + byte[] word = row.second().getValue(WORD_COLFAM, null); + if (word != null) { + emitter.emit(Bytes.toString(word)); + } + } + }, words.getTypeFamily().strings())); + + return counts.parallelDo("convert to put", new DoFn<Pair<String, Long>, Put>() { + @Override + public void process(Pair<String, Long> input, Emitter<Put> emitter) { + Put put = new Put(Bytes.toBytes(input.first())); + put.add(COUNTS_COLFAM, null, Bytes.toBytes(input.second())); + emitter.emit(put); + } + + }, Writables.writables(Put.class)); + } + + @Before + public void setUp() throws Exception { + Configuration conf = hbaseTestUtil.getConfiguration(); + conf.set("hadoop.log.dir", tmpDir.getFileName("logs")); + conf.set("hadoop.tmp.dir", tmpDir.getFileName("hadoop-tmp")); + conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); + conf.setInt("hbase.master.info.port", -1); + conf.setInt("hbase.regionserver.info.port", -1); + + // Workaround for HBASE-5711, we need to set config value dfs.datanode.data.dir.perm + // equal to the permissions of the temp dirs on the filesystem. These temp dirs were + // probably created using this process' umask. So we guess the temp dir permissions as + // 0777 & ~umask, and use that to set the config value. + try { + Process process = Runtime.getRuntime().exec("/bin/sh -c umask"); + BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream())); + int rc = process.waitFor(); + if(rc == 0) { + String umask = br.readLine(); + + int umaskBits = Integer.parseInt(umask, 8); + int permBits = 0777 & ~umaskBits; + String perms = Integer.toString(permBits, 8); + + conf.set("dfs.datanode.data.dir.perm", perms); + } + } catch (Exception e) { + // ignore errors, we might not be running on POSIX, or "sh" might not be on the path + } + + hbaseTestUtil.startMiniZKCluster(); + hbaseTestUtil.startMiniCluster(); + hbaseTestUtil.startMiniMapReduceCluster(1); + + // For Hadoop-2.0.0, we have to do a bit more work. + if (TaskAttemptContext.class.isInterface()) { + conf = hbaseTestUtil.getConfiguration(); + FileSystem fs = FileSystem.get(conf); + Path tmpPath = new Path("target", "WordCountHBaseTest-tmpDir"); + FileSystem localFS = FileSystem.getLocal(conf); + for (FileStatus jarFile : localFS.listStatus(new Path("target/lib/"))) { + Path target = new Path(tmpPath, jarFile.getPath().getName()); + fs.copyFromLocalFile(jarFile.getPath(), target); + DistributedCache.addFileToClassPath(target, conf, fs); + } + + // Create a programmatic container for this jar. + JarOutputStream jos = new JarOutputStream(new FileOutputStream("WordCountHBaseIT.jar")); + File baseDir = new File("target/test-classes"); + String prefix = "org/apache/crunch/io/hbase/"; + jarUp(jos, baseDir, prefix + "WordCountHBaseIT.class"); + jarUp(jos, baseDir, prefix + "WordCountHBaseIT$1.class"); + jarUp(jos, baseDir, prefix + "WordCountHBaseIT$2.class"); + jos.close(); + + Path target = new Path(tmpPath, "WordCountHBaseIT.jar"); + fs.copyFromLocalFile(true, new Path("WordCountHBaseIT.jar"), target); + DistributedCache.addFileToClassPath(target, conf, fs); + } + } + + private void jarUp(JarOutputStream jos, File baseDir, String classDir) throws IOException { + File file = new File(baseDir, classDir); + JarEntry e = new JarEntry(classDir); + e.setTime(file.lastModified()); + jos.putNextEntry(e); + ByteStreams.copy(new FileInputStream(file), jos); + jos.closeEntry(); + } + + @Test + public void testWordCount() throws IOException { + run(new MRPipeline(WordCountHBaseIT.class, hbaseTestUtil.getConfiguration())); + } + + @After + public void tearDown() throws Exception { + hbaseTestUtil.shutdownMiniMapReduceCluster(); + hbaseTestUtil.shutdownMiniCluster(); + hbaseTestUtil.shutdownMiniZKCluster(); + } + + public void run(Pipeline pipeline) throws IOException { + + Random rand = new Random(); + int postFix = Math.abs(rand.nextInt()); + String inputTableName = "crunch_words_" + postFix; + String outputTableName = "crunch_counts_" + postFix; + + try { + + HTable inputTable = hbaseTestUtil.createTable(Bytes.toBytes(inputTableName), WORD_COLFAM); + HTable outputTable = hbaseTestUtil.createTable(Bytes.toBytes(outputTableName), COUNTS_COLFAM); + + int key = 0; + key = put(inputTable, key, "cat"); + key = put(inputTable, key, "cat"); + key = put(inputTable, key, "dog"); + Scan scan = new Scan(); + scan.addColumn(WORD_COLFAM, null); + HBaseSourceTarget source = new HBaseSourceTarget(inputTableName, scan); + PTable<ImmutableBytesWritable, Result> shakespeare = pipeline.read(source); + pipeline.write(wordCount(shakespeare), new HBaseTarget(outputTableName)); + pipeline.done(); + + assertIsLong(outputTable, "cat", 2); + assertIsLong(outputTable, "dog", 1); + } finally { + // not quite sure... + } + } + + protected int put(HTable table, int key, String value) throws IOException { + Put put = new Put(Bytes.toBytes(key)); + put.add(WORD_COLFAM, null, Bytes.toBytes(value)); + table.put(put); + return key + 1; + } + + protected void assertIsLong(HTable table, String key, long i) throws IOException { + Get get = new Get(Bytes.toBytes(key)); + get.addColumn(COUNTS_COLFAM, null); + Result result = table.get(get); + + byte[] rawCount = result.getValue(COUNTS_COLFAM, null); + assertTrue(rawCount != null); + assertEquals(new Long(i), new Long(Bytes.toLong(rawCount))); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a8691d0b/crunch-hbase/src/it/java/org/apache/crunch/test/TemporaryPaths.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/it/java/org/apache/crunch/test/TemporaryPaths.java b/crunch-hbase/src/it/java/org/apache/crunch/test/TemporaryPaths.java new file mode 100644 index 0000000..97cf0de --- /dev/null +++ b/crunch-hbase/src/it/java/org/apache/crunch/test/TemporaryPaths.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.test; + +import org.apache.crunch.impl.mr.run.RuntimeParameters; +import org.apache.hadoop.conf.Configuration; + + +/** + * Utilities for working with {@link TemporaryPath}. + */ +public final class TemporaryPaths { + + /** + * Static factory returning a {@link TemporaryPath} with adjusted + * {@link Configuration} properties. + */ + public static TemporaryPath create() { + return new TemporaryPath(RuntimeParameters.TMP_DIR, "hadoop.tmp.dir"); + } + + private TemporaryPaths() { + // nothing + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a8691d0b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/AtHBase.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/AtHBase.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/AtHBase.java new file mode 100644 index 0000000..33ed036 --- /dev/null +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/AtHBase.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.hbase; + +import org.apache.crunch.SourceTarget; +import org.apache.hadoop.hbase.client.Scan; + + +/** + * Static factory methods for creating HBase {@link SourceTarget} types. + */ +public class AtHBase { + + public static HBaseSourceTarget table(String table) { + return table(table, new Scan()); + } + + public static HBaseSourceTarget table(String table, Scan scan) { + return new HBaseSourceTarget(table, scan); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a8691d0b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/FromHBase.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/FromHBase.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/FromHBase.java new file mode 100644 index 0000000..221de9b --- /dev/null +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/FromHBase.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.hbase; + +import org.apache.crunch.Source; +import org.apache.crunch.TableSource; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; + +/** + * Static factory methods for creating HBase {@link Source} types. + */ +public class FromHBase { + + public static TableSource<ImmutableBytesWritable, Result> table(String table) { + return table(table, new Scan()); + } + + public static TableSource<ImmutableBytesWritable, Result> table(String table, Scan scan) { + return new HBaseSourceTarget(table, scan); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a8691d0b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java new file mode 100644 index 0000000..fcb9de1 --- /dev/null +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.hbase; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.crunch.Pair; +import org.apache.crunch.SourceTarget; +import org.apache.crunch.TableSource; +import org.apache.crunch.impl.mr.run.CrunchMapper; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hbase.util.Base64; +import org.apache.hadoop.mapreduce.Job; + +public class HBaseSourceTarget extends HBaseTarget implements SourceTarget<Pair<ImmutableBytesWritable, Result>>, + TableSource<ImmutableBytesWritable, Result> { + + private static final PTableType<ImmutableBytesWritable, Result> PTYPE = Writables.tableOf( + Writables.writables(ImmutableBytesWritable.class), Writables.writables(Result.class)); + + protected Scan scan; + + public HBaseSourceTarget(String table, Scan scan) { + super(table); + this.scan = scan; + } + + @Override + public PType<Pair<ImmutableBytesWritable, Result>> getType() { + return PTYPE; + } + + @Override + public PTableType<ImmutableBytesWritable, Result> getTableType() { + return PTYPE; + } + + @Override + public boolean equals(Object other) { + if (other == null || !(other instanceof HBaseSourceTarget)) { + return false; + } + HBaseSourceTarget o = (HBaseSourceTarget) other; + // XXX scan does not have equals method + return table.equals(o.table) && scan.equals(o.scan); + } + + @Override + public int hashCode() { + return new HashCodeBuilder().append(table).append(scan).toHashCode(); + } + + @Override + public String toString() { + return "HBaseTable(" + table + ")"; + } + + @Override + public void configureSource(Job job, int inputId) throws IOException { + Configuration conf = job.getConfiguration(); + job.setInputFormatClass(TableInputFormat.class); + job.setMapperClass(CrunchMapper.class); + HBaseConfiguration.addHbaseResources(conf); + conf.set(TableInputFormat.INPUT_TABLE, table); + conf.set(TableInputFormat.SCAN, convertScanToString(scan)); + TableMapReduceUtil.addDependencyJars(job); + } + + static String convertScanToString(Scan scan) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(out); + scan.write(dos); + return Base64.encodeBytes(out.toByteArray()); + } + + @Override + public long getSize(Configuration conf) { + // TODO something smarter here. + return 1000L * 1000L * 1000L; + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a8691d0b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java new file mode 100644 index 0000000..050cff1 --- /dev/null +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.hbase; + +import java.io.IOException; + +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.crunch.SourceTarget; +import org.apache.crunch.impl.mr.run.CrunchRuntimeException; +import org.apache.crunch.io.MapReduceTarget; +import org.apache.crunch.io.OutputHandler; +import org.apache.crunch.types.PType; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; +import org.apache.hadoop.mapreduce.Job; + +public class HBaseTarget implements MapReduceTarget { + + protected String table; + + public HBaseTarget(String table) { + this.table = table; + } + + @Override + public boolean equals(Object other) { + if (this == other) + return true; + if (other == null) + return false; + if (!other.getClass().equals(getClass())) + return false; + HBaseTarget o = (HBaseTarget) other; + return table.equals(o.table); + } + + @Override + public int hashCode() { + HashCodeBuilder hcb = new HashCodeBuilder(); + return hcb.append(table).toHashCode(); + } + + @Override + public String toString() { + return "HBaseTable(" + table + ")"; + } + + @Override + public boolean accept(OutputHandler handler, PType<?> ptype) { + if (Put.class.equals(ptype.getTypeClass())) { + handler.configure(this, ptype); + return true; + } + return false; + } + + @Override + public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) { + Configuration conf = job.getConfiguration(); + HBaseConfiguration.addHbaseResources(conf); + job.setOutputFormatClass(TableOutputFormat.class); + conf.set(TableOutputFormat.OUTPUT_TABLE, table); + try { + TableMapReduceUtil.addDependencyJars(job); + } catch (IOException e) { + throw new CrunchRuntimeException(e); + } + } + + @Override + public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a8691d0b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/ToHBase.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/ToHBase.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/ToHBase.java new file mode 100644 index 0000000..fa6b1a3 --- /dev/null +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/ToHBase.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.hbase; + +import org.apache.crunch.Target; + +/** + * Static factory methods for creating HBase {@link Target} types. + */ +public class ToHBase { + + public static Target table(String table) { + return new HBaseTarget(table); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a8691d0b/crunch/pom.xml ---------------------------------------------------------------------- diff --git a/crunch/pom.xml b/crunch/pom.xml index 3902880..df41df3 100644 --- a/crunch/pom.xml +++ b/crunch/pom.xml @@ -57,6 +57,11 @@ under the License. </dependency> <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + </dependency> + + <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-core-asl</artifactId> </dependency> @@ -94,7 +99,13 @@ under the License. <artifactId>crunch-test</artifactId> <scope>test</scope> </dependency> - + + <dependency> + <groupId>commons-httpclient</groupId> + <artifactId>commons-httpclient</artifactId> + <scope>test</scope> <!-- only needed for LocalJobRunner --> + </dependency> + <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> @@ -110,38 +121,10 @@ under the License. <artifactId>slf4j-log4j12</artifactId> </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase</artifactId> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase</artifactId> - <type>test-jar</type> - <scope>test</scope> - </dependency> </dependencies> <build> <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-dependency-plugin</artifactId> - <executions> - <execution> - <phase>test-compile</phase> - <goals> - <goal>copy-dependencies</goal> - </goals> - <configuration> - <outputDirectory>${project.build.directory}/lib</outputDirectory> - <excludeArtifactIds>crunch-test</excludeArtifactIds> - </configuration> - </execution> - </executions> - </plugin> <!-- We put slow-running tests into src/it and run them during the integration-test phase using the failsafe plugin. This way developers can run unit tests conveniently from the IDE or via http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a8691d0b/crunch/src/it/java/org/apache/crunch/WordCountHBaseIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/WordCountHBaseIT.java b/crunch/src/it/java/org/apache/crunch/WordCountHBaseIT.java deleted file mode 100644 index b96c125..0000000 --- a/crunch/src/it/java/org/apache/crunch/WordCountHBaseIT.java +++ /dev/null @@ -1,223 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.Random; -import java.util.jar.JarEntry; -import java.util.jar.JarOutputStream; - -import org.apache.crunch.impl.mr.MRPipeline; -import org.apache.crunch.io.hbase.HBaseSourceTarget; -import org.apache.crunch.io.hbase.HBaseTarget; -import org.apache.crunch.lib.Aggregate; -import org.apache.crunch.test.TemporaryPath; -import org.apache.crunch.test.TemporaryPaths; -import org.apache.crunch.types.writable.Writables; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.filecache.DistributedCache; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapred.TaskAttemptContext; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; - -import com.google.common.io.ByteStreams; - -public class WordCountHBaseIT { - @Rule - public TemporaryPath tmpDir = TemporaryPaths.create(); - - private static final byte[] COUNTS_COLFAM = Bytes.toBytes("cf"); - private static final byte[] WORD_COLFAM = Bytes.toBytes("cf"); - - private HBaseTestingUtility hbaseTestUtil = new HBaseTestingUtility(); - - @SuppressWarnings("serial") - public static PCollection<Put> wordCount(PTable<ImmutableBytesWritable, Result> words) { - PTable<String, Long> counts = Aggregate.count(words.parallelDo( - new DoFn<Pair<ImmutableBytesWritable, Result>, String>() { - @Override - public void process(Pair<ImmutableBytesWritable, Result> row, Emitter<String> emitter) { - byte[] word = row.second().getValue(WORD_COLFAM, null); - if (word != null) { - emitter.emit(Bytes.toString(word)); - } - } - }, words.getTypeFamily().strings())); - - return counts.parallelDo("convert to put", new DoFn<Pair<String, Long>, Put>() { - @Override - public void process(Pair<String, Long> input, Emitter<Put> emitter) { - Put put = new Put(Bytes.toBytes(input.first())); - put.add(COUNTS_COLFAM, null, Bytes.toBytes(input.second())); - emitter.emit(put); - } - - }, Writables.writables(Put.class)); - } - - @Before - public void setUp() throws Exception { - Configuration conf = hbaseTestUtil.getConfiguration(); - conf.set("hadoop.log.dir", tmpDir.getFileName("logs")); - conf.set("hadoop.tmp.dir", tmpDir.getFileName("hadoop-tmp")); - conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); - conf.setInt("hbase.master.info.port", -1); - conf.setInt("hbase.regionserver.info.port", -1); - - // Workaround for HBASE-5711, we need to set config value dfs.datanode.data.dir.perm - // equal to the permissions of the temp dirs on the filesystem. These temp dirs were - // probably created using this process' umask. So we guess the temp dir permissions as - // 0777 & ~umask, and use that to set the config value. - try { - Process process = Runtime.getRuntime().exec("/bin/sh -c umask"); - BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream())); - int rc = process.waitFor(); - if(rc == 0) { - String umask = br.readLine(); - - int umaskBits = Integer.parseInt(umask, 8); - int permBits = 0777 & ~umaskBits; - String perms = Integer.toString(permBits, 8); - - conf.set("dfs.datanode.data.dir.perm", perms); - } - } catch (Exception e) { - // ignore errors, we might not be running on POSIX, or "sh" might not be on the path - } - - hbaseTestUtil.startMiniZKCluster(); - hbaseTestUtil.startMiniCluster(); - hbaseTestUtil.startMiniMapReduceCluster(1); - - // For Hadoop-2.0.0, we have to do a bit more work. - if (TaskAttemptContext.class.isInterface()) { - conf = hbaseTestUtil.getConfiguration(); - FileSystem fs = FileSystem.get(conf); - Path tmpPath = new Path("target", "WordCountHBaseTest-tmpDir"); - FileSystem localFS = FileSystem.getLocal(conf); - for (FileStatus jarFile : localFS.listStatus(new Path("target/lib/"))) { - Path target = new Path(tmpPath, jarFile.getPath().getName()); - fs.copyFromLocalFile(jarFile.getPath(), target); - DistributedCache.addFileToClassPath(target, conf, fs); - } - - // Create a programmatic container for this jar. - JarOutputStream jos = new JarOutputStream(new FileOutputStream("WordCountHBaseIT.jar")); - File baseDir = new File("target/test-classes"); - String prefix = "org/apache/crunch/"; - jarUp(jos, baseDir, prefix + "WordCountHBaseIT.class"); - jarUp(jos, baseDir, prefix + "WordCountHBaseIT$1.class"); - jarUp(jos, baseDir, prefix + "WordCountHBaseIT$2.class"); - jos.close(); - - Path target = new Path(tmpPath, "WordCountHBaseIT.jar"); - fs.copyFromLocalFile(true, new Path("WordCountHBaseIT.jar"), target); - DistributedCache.addFileToClassPath(target, conf, fs); - } - } - - private void jarUp(JarOutputStream jos, File baseDir, String classDir) throws IOException { - File file = new File(baseDir, classDir); - JarEntry e = new JarEntry(classDir); - e.setTime(file.lastModified()); - jos.putNextEntry(e); - ByteStreams.copy(new FileInputStream(file), jos); - jos.closeEntry(); - } - - @Test - public void testWordCount() throws IOException { - run(new MRPipeline(WordCountHBaseIT.class, hbaseTestUtil.getConfiguration())); - } - - @After - public void tearDown() throws Exception { - hbaseTestUtil.shutdownMiniMapReduceCluster(); - hbaseTestUtil.shutdownMiniCluster(); - hbaseTestUtil.shutdownMiniZKCluster(); - } - - public void run(Pipeline pipeline) throws IOException { - - Random rand = new Random(); - int postFix = Math.abs(rand.nextInt()); - String inputTableName = "crunch_words_" + postFix; - String outputTableName = "crunch_counts_" + postFix; - - try { - - HTable inputTable = hbaseTestUtil.createTable(Bytes.toBytes(inputTableName), WORD_COLFAM); - HTable outputTable = hbaseTestUtil.createTable(Bytes.toBytes(outputTableName), COUNTS_COLFAM); - - int key = 0; - key = put(inputTable, key, "cat"); - key = put(inputTable, key, "cat"); - key = put(inputTable, key, "dog"); - Scan scan = new Scan(); - scan.addColumn(WORD_COLFAM, null); - HBaseSourceTarget source = new HBaseSourceTarget(inputTableName, scan); - PTable<ImmutableBytesWritable, Result> shakespeare = pipeline.read(source); - pipeline.write(wordCount(shakespeare), new HBaseTarget(outputTableName)); - pipeline.done(); - - assertIsLong(outputTable, "cat", 2); - assertIsLong(outputTable, "dog", 1); - } finally { - // not quite sure... - } - } - - protected int put(HTable table, int key, String value) throws IOException { - Put put = new Put(Bytes.toBytes(key)); - put.add(WORD_COLFAM, null, Bytes.toBytes(value)); - table.put(put); - return key + 1; - } - - protected void assertIsLong(HTable table, String key, long i) throws IOException { - Get get = new Get(Bytes.toBytes(key)); - get.addColumn(COUNTS_COLFAM, null); - Result result = table.get(get); - - byte[] rawCount = result.getValue(COUNTS_COLFAM, null); - assertTrue(rawCount != null); - assertEquals(new Long(i), new Long(Bytes.toLong(rawCount))); - } -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a8691d0b/crunch/src/main/java/org/apache/crunch/io/At.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/At.java b/crunch/src/main/java/org/apache/crunch/io/At.java index 2d787e3..951b740 100644 --- a/crunch/src/main/java/org/apache/crunch/io/At.java +++ b/crunch/src/main/java/org/apache/crunch/io/At.java @@ -19,7 +19,6 @@ package org.apache.crunch.io; import org.apache.crunch.SourceTarget; import org.apache.crunch.io.avro.AvroFileSourceTarget; -import org.apache.crunch.io.hbase.HBaseSourceTarget; import org.apache.crunch.io.seq.SeqFileSourceTarget; import org.apache.crunch.io.seq.SeqFileTableSourceTarget; import org.apache.crunch.io.text.TextFileSourceTarget; @@ -28,7 +27,6 @@ import org.apache.crunch.types.PTypeFamily; import org.apache.crunch.types.avro.AvroType; import org.apache.crunch.types.writable.Writables; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.client.Scan; /** * Static factory methods for creating various {@link SourceTarget} types. @@ -43,14 +41,6 @@ public class At { return new AvroFileSourceTarget<T>(path, avroType); } - public static HBaseSourceTarget hbaseTable(String table) { - return hbaseTable(table, new Scan()); - } - - public static HBaseSourceTarget hbaseTable(String table, Scan scan) { - return new HBaseSourceTarget(table, scan); - } - public static <T> SeqFileSourceTarget<T> sequenceFile(String pathName, PType<T> ptype) { return sequenceFile(new Path(pathName), ptype); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a8691d0b/crunch/src/main/java/org/apache/crunch/io/From.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/From.java b/crunch/src/main/java/org/apache/crunch/io/From.java index c7ae022..706be23 100644 --- a/crunch/src/main/java/org/apache/crunch/io/From.java +++ b/crunch/src/main/java/org/apache/crunch/io/From.java @@ -20,7 +20,6 @@ package org.apache.crunch.io; import org.apache.crunch.Source; import org.apache.crunch.TableSource; import org.apache.crunch.io.avro.AvroFileSource; -import org.apache.crunch.io.hbase.HBaseSourceTarget; import org.apache.crunch.io.impl.FileTableSourceImpl; import org.apache.crunch.io.seq.SeqFileSource; import org.apache.crunch.io.seq.SeqFileTableSourceTarget; @@ -31,9 +30,6 @@ import org.apache.crunch.types.PTypeFamily; import org.apache.crunch.types.avro.AvroType; import org.apache.crunch.types.writable.Writables; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; /** @@ -61,14 +57,6 @@ public class From { return new AvroFileSource<T>(path, avroType); } - public static TableSource<ImmutableBytesWritable, Result> hbaseTable(String table) { - return hbaseTable(table, new Scan()); - } - - public static TableSource<ImmutableBytesWritable, Result> hbaseTable(String table, Scan scan) { - return new HBaseSourceTarget(table, scan); - } - public static <T> Source<T> sequenceFile(String pathName, PType<T> ptype) { return sequenceFile(new Path(pathName), ptype); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a8691d0b/crunch/src/main/java/org/apache/crunch/io/To.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/To.java b/crunch/src/main/java/org/apache/crunch/io/To.java index 3190c64..faaa4d8 100644 --- a/crunch/src/main/java/org/apache/crunch/io/To.java +++ b/crunch/src/main/java/org/apache/crunch/io/To.java @@ -19,7 +19,6 @@ package org.apache.crunch.io; import org.apache.crunch.Target; import org.apache.crunch.io.avro.AvroFileTarget; -import org.apache.crunch.io.hbase.HBaseTarget; import org.apache.crunch.io.impl.FileTargetImpl; import org.apache.crunch.io.seq.SeqFileTarget; import org.apache.crunch.io.text.TextFileTarget; @@ -48,10 +47,6 @@ public class To { return new AvroFileTarget(path); } - public static Target hbaseTable(String table) { - return new HBaseTarget(table); - } - public static Target sequenceFile(String pathName) { return sequenceFile(new Path(pathName)); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a8691d0b/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java deleted file mode 100644 index fcb9de1..0000000 --- a/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.io.hbase; - -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; - -import org.apache.commons.lang.builder.HashCodeBuilder; -import org.apache.crunch.Pair; -import org.apache.crunch.SourceTarget; -import org.apache.crunch.TableSource; -import org.apache.crunch.impl.mr.run.CrunchMapper; -import org.apache.crunch.types.PTableType; -import org.apache.crunch.types.PType; -import org.apache.crunch.types.writable.Writables; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.TableInputFormat; -import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; -import org.apache.hadoop.hbase.util.Base64; -import org.apache.hadoop.mapreduce.Job; - -public class HBaseSourceTarget extends HBaseTarget implements SourceTarget<Pair<ImmutableBytesWritable, Result>>, - TableSource<ImmutableBytesWritable, Result> { - - private static final PTableType<ImmutableBytesWritable, Result> PTYPE = Writables.tableOf( - Writables.writables(ImmutableBytesWritable.class), Writables.writables(Result.class)); - - protected Scan scan; - - public HBaseSourceTarget(String table, Scan scan) { - super(table); - this.scan = scan; - } - - @Override - public PType<Pair<ImmutableBytesWritable, Result>> getType() { - return PTYPE; - } - - @Override - public PTableType<ImmutableBytesWritable, Result> getTableType() { - return PTYPE; - } - - @Override - public boolean equals(Object other) { - if (other == null || !(other instanceof HBaseSourceTarget)) { - return false; - } - HBaseSourceTarget o = (HBaseSourceTarget) other; - // XXX scan does not have equals method - return table.equals(o.table) && scan.equals(o.scan); - } - - @Override - public int hashCode() { - return new HashCodeBuilder().append(table).append(scan).toHashCode(); - } - - @Override - public String toString() { - return "HBaseTable(" + table + ")"; - } - - @Override - public void configureSource(Job job, int inputId) throws IOException { - Configuration conf = job.getConfiguration(); - job.setInputFormatClass(TableInputFormat.class); - job.setMapperClass(CrunchMapper.class); - HBaseConfiguration.addHbaseResources(conf); - conf.set(TableInputFormat.INPUT_TABLE, table); - conf.set(TableInputFormat.SCAN, convertScanToString(scan)); - TableMapReduceUtil.addDependencyJars(job); - } - - static String convertScanToString(Scan scan) throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(out); - scan.write(dos); - return Base64.encodeBytes(out.toByteArray()); - } - - @Override - public long getSize(Configuration conf) { - // TODO something smarter here. - return 1000L * 1000L * 1000L; - } -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a8691d0b/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java b/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java deleted file mode 100644 index 050cff1..0000000 --- a/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.io.hbase; - -import java.io.IOException; - -import org.apache.commons.lang.builder.HashCodeBuilder; -import org.apache.crunch.SourceTarget; -import org.apache.crunch.impl.mr.run.CrunchRuntimeException; -import org.apache.crunch.io.MapReduceTarget; -import org.apache.crunch.io.OutputHandler; -import org.apache.crunch.types.PType; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; -import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; -import org.apache.hadoop.mapreduce.Job; - -public class HBaseTarget implements MapReduceTarget { - - protected String table; - - public HBaseTarget(String table) { - this.table = table; - } - - @Override - public boolean equals(Object other) { - if (this == other) - return true; - if (other == null) - return false; - if (!other.getClass().equals(getClass())) - return false; - HBaseTarget o = (HBaseTarget) other; - return table.equals(o.table); - } - - @Override - public int hashCode() { - HashCodeBuilder hcb = new HashCodeBuilder(); - return hcb.append(table).toHashCode(); - } - - @Override - public String toString() { - return "HBaseTable(" + table + ")"; - } - - @Override - public boolean accept(OutputHandler handler, PType<?> ptype) { - if (Put.class.equals(ptype.getTypeClass())) { - handler.configure(this, ptype); - return true; - } - return false; - } - - @Override - public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) { - Configuration conf = job.getConfiguration(); - HBaseConfiguration.addHbaseResources(conf); - job.setOutputFormatClass(TableOutputFormat.class); - conf.set(TableOutputFormat.OUTPUT_TABLE, table); - try { - TableMapReduceUtil.addDependencyJars(job); - } catch (IOException e) { - throw new CrunchRuntimeException(e); - } - } - - @Override - public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) { - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a8691d0b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 0e44105..6fcb21f 100644 --- a/pom.xml +++ b/pom.xml @@ -46,6 +46,7 @@ under the License. <modules> <module>crunch</module> + <module>crunch-hbase</module> <module>crunch-test</module> <module>crunch-examples</module> <module>crunch-scrunch</module> @@ -105,6 +106,12 @@ under the License. <dependency> <groupId>org.apache.crunch</groupId> + <artifactId>crunch-hbase</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.crunch</groupId> <artifactId>crunch-test</artifactId> <version>${project.version}</version> </dependency> @@ -134,6 +141,24 @@ under the License. </dependency> <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + <version>1.4</version> + </dependency> + + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + <version>2.4</version> + </dependency> + + <dependency> + <groupId>commons-httpclient</groupId> + <artifactId>commons-httpclient</artifactId> + <version>3.0.1</version> + </dependency> + + <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-core-asl</artifactId> <version>1.8.3</version> @@ -207,8 +232,14 @@ under the License. <dependency> <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.4.3</version> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> - <version>1.6.1</version> + <version>1.4.3</version> </dependency> <dependency>
