Updated Branches: refs/heads/master 60d6ca5de -> 012e924e6
CRUNCH-31: Crunch HBase example, contributed by Gauthier Ambard. Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/012e924e Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/012e924e Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/012e924e Branch: refs/heads/master Commit: 012e924e6a2900065a7f9fefd2265136d65af2ee Parents: 60d6ca5 Author: jwills <[email protected]> Authored: Mon Aug 20 00:30:26 2012 -0700 Committer: jwills <[email protected]> Committed: Mon Aug 20 00:30:26 2012 -0700 ---------------------------------------------------------------------- crunch-examples/pom.xml | 5 + .../crunch/examples/WordAggregationHBase.java | 250 +++++++++++++++ 2 files changed, 255 insertions(+), 0 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/012e924e/crunch-examples/pom.xml ---------------------------------------------------------------------- diff --git a/crunch-examples/pom.xml b/crunch-examples/pom.xml index 66465ca..c07207a 100644 --- a/crunch-examples/pom.xml +++ b/crunch-examples/pom.xml @@ -45,6 +45,11 @@ under the License. <groupId>org.apache.crunch</groupId> <artifactId>crunch</artifactId> </dependency> + + <dependency> + <groupId>org.apache.crunch</groupId> + <artifactId>crunch-hbase</artifactId> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/012e924e/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java ---------------------------------------------------------------------- diff --git a/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java b/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java new file mode 100644 index 0000000..680752f --- /dev/null +++ b/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.examples; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.crunch.CombineFn; +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.hbase.HBaseSourceTarget; +import org.apache.crunch.io.hbase.HBaseTarget; +import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * You need to have a HBase instance running. Required dependencies : hbase /!\ + * The version should be your version of hbase. <dependency> + * <groupId>org.apache.hbase</groupId> <artifactId>hbase</artifactId> + * <version>...</version> </dependency> + */ +@SuppressWarnings("serial") +public class WordAggregationHBase extends Configured implements Tool, Serializable { + private final static Logger LOGGER = LoggerFactory.getLogger(WordAggregationHBase.class); + + // Configuration parameters. Here configured for a hbase instance running + // locally + private static String HBASE_CONFIGURATION_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"; + private static String HBASE_CONFIGURATION_ZOOKEEPER_CLIENTPORT = "hbase.zookeeper.property.clientPort"; + private static String hbaseZookeeperQuorum = "localhost"; + private static String hbaseZookeeperClientPort = "2181"; + + // HBase parameters + private final String TABLE_SOURCE = "list"; + private final String TABLE_TARGET = "aggregation"; + + private final byte[] COLUMN_FAMILY_SOURCE = Bytes.toBytes("content"); + private final byte[] COLUMN_QUALIFIER_SOURCE_PLAY = Bytes.toBytes("play"); + private final byte[] COLUMN_QUALIFIER_SOURCE_QUOTE = Bytes.toBytes("quote"); + + private final byte[] COLUMN_FAMILY_TARGET = Bytes.toBytes("aggregation"); + private final byte[] COLUMN_QUALIFIER_TARGET_TEXT = Bytes.toBytes("text"); + + @Override + public int run(final String[] args) throws Exception { + // We create the test rows first + String type1 = "romeo and juliet"; + String type2 = "macbeth"; + + String quote1 = "That which we call a rose By any other word would smell as sweet"; + String quote2 = "But, soft! what light through yonder window breaks? It is the east, and Juliet is the sun."; + String quote3 = "But first, let me tell ye, if you should leadher in a fool's paradise, as they say,"; + String quote4 = "Fair is foul, and foul is fair"; + String quote5 = "But screw your courage to the sticking-place, And we'll not fail."; + + String[] character = { "juliet", "romeo", "nurse", "witch", "macbeth" }; + String[] type = { type1, type1, type1, type2, type2 }; + String[] quote = { quote1, quote2, quote3, quote4, quote5 }; + + List<Put> putList = createPuts(Arrays.asList(character), Arrays.asList(type), Arrays.asList(quote)); + + // We create the tables and fill the source + Configuration configuration = getConf(); + + createTable(configuration, TABLE_SOURCE, Bytes.toString(COLUMN_FAMILY_SOURCE)); + createTable(configuration, TABLE_TARGET, Bytes.toString(COLUMN_FAMILY_TARGET)); + + putInHbase(putList, configuration); + + // We create the pipeline which will handle most of the job. + Pipeline pipeline = new MRPipeline(WordAggregationHBase.class, HBaseConfiguration.create()); + + // The scan which will retrieve the data from the source in hbase. + Scan scan = new Scan(); + scan.addColumn(COLUMN_FAMILY_SOURCE, COLUMN_QUALIFIER_SOURCE_PLAY); + scan.addColumn(COLUMN_FAMILY_SOURCE, COLUMN_QUALIFIER_SOURCE_QUOTE); + + // Our hbase source + HBaseSourceTarget source = new HBaseSourceTarget(TABLE_SOURCE, scan); + + // Our source, in a format which can be use by crunch + PTable<ImmutableBytesWritable, Result> rawText = pipeline.read(source); + + // We process the data from the source HTable then concatenate all data + // with the same rowkey + PTable<String, String> textExtracted = extractText(rawText); + CombineFn<String, String> stringConcatCombine = CombineFn.STRING_CONCAT(" ", true); + PTable<String, String> result = textExtracted.groupByKey().combineValues(stringConcatCombine); + + // We create the collection of puts from the concatenated datas + PCollection<Put> resultPut = createPut(result); + + // We write the puts in hbase, in the target table + pipeline.write(resultPut, new HBaseTarget(TABLE_TARGET)); + + pipeline.done(); + return 0; + } + + /** + * Put the puts in HBase + * + * @param putList the puts + * @param conf the hbase configuration + * @throws IOException + */ + private void putInHbase(final List<Put> putList, final Configuration conf) throws IOException { + HTable htable = new HTable(conf, TABLE_SOURCE); + try { + htable.put(putList); + } finally { + htable.close(); + } + } + + /** + * Create the table if they don't exist + * + * @param conf the hbase configuration + * @param htableName the table name + * @param families the column family names + * @throws MasterNotRunningException + * @throws ZooKeeperConnectionException + * @throws IOException + */ + private void createTable(final Configuration conf, final String htableName, final String... families) throws MasterNotRunningException, ZooKeeperConnectionException, + IOException { + HBaseAdmin hbase = new HBaseAdmin(conf); + if (!hbase.tableExists(htableName)) { + HTableDescriptor desc = new HTableDescriptor(htableName); + for (String s : families) { + HColumnDescriptor meta = new HColumnDescriptor(s); + desc.addFamily(meta); + } + hbase.createTable(desc); + } + } + + /** + * Create a list of puts + * + * @param character the rowkey + * @param play the play (in column COLUMN_QUALIFIER_SOURCE_PLAY) + * @param quote the quote (in column COLUMN_QUALIFIER_SOURCE_QUOTE) + * @return + */ + private List<Put> createPuts(final List<String> character, final List<String> play, final List<String> quote) throws IllegalArgumentException { + List<Put> list = new ArrayList<Put>(); + if (character.size() != play.size() || quote.size() != play.size()) { + LOGGER.error("Every list should have the same number of elements"); + throw new IllegalArgumentException("Every list should have the same number of elements"); + } + for (int i = 0; i < character.size(); i++) { + Put put = new Put(Bytes.toBytes(character.get(i))); + put.add(COLUMN_FAMILY_SOURCE, COLUMN_QUALIFIER_SOURCE_PLAY, Bytes.toBytes(play.get(i))); + put.add(COLUMN_FAMILY_SOURCE, COLUMN_QUALIFIER_SOURCE_QUOTE, Bytes.toBytes(quote.get(i))); + list.add(put); + } + return list; + } + + /** + * Extract information from hbase + * + * @param words the source from hbase + * @return a <code>PTable</code> composed of the type of the input as key + * and its def as value + */ + public PTable<String, String> extractText(final PTable<ImmutableBytesWritable, Result> words) { + return words.parallelDo("Extract text", new DoFn<Pair<ImmutableBytesWritable, Result>, Pair<String, String>>() { + @Override + public void process(final Pair<ImmutableBytesWritable, Result> row, final Emitter<Pair<String, String>> emitter) { + byte[] type = row.second().getValue(COLUMN_FAMILY_SOURCE, COLUMN_QUALIFIER_SOURCE_PLAY); + byte[] def = row.second().getValue(COLUMN_FAMILY_SOURCE, COLUMN_QUALIFIER_SOURCE_QUOTE); + if (type != null && def != null) { + emitter.emit(new Pair<String, String>(Bytes.toString(type), Bytes.toString(def))); + } + } + }, Writables.tableOf(Writables.strings(), Writables.strings())); + } + + /** + * Create puts in order to insert them in hbase. + * + * @param extractedText + * a PTable which contain the data in order to create the puts: + * keys of the PTable are rowkeys for the puts, values are the + * values for hbase. + * @return a PCollection formed by the puts. + */ + public PCollection<Put> createPut(final PTable<String, String> extractedText) { + return extractedText.parallelDo("Convert to puts", new DoFn<Pair<String, String>, Put>() { + @Override + public void process(final Pair<String, String> input, final Emitter<Put> emitter) { + Put put; + put = new Put(Bytes.toBytes(input.first())); + put.add(COLUMN_FAMILY_TARGET, COLUMN_QUALIFIER_TARGET_TEXT, Bytes.toBytes(input.second())); + emitter.emit(put); + } + }, Writables.writables(Put.class)); + } + + public static void main(final String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + // Configuration hbase + conf.set(HBASE_CONFIGURATION_ZOOKEEPER_QUORUM, hbaseZookeeperQuorum); + conf.set(HBASE_CONFIGURATION_ZOOKEEPER_CLIENTPORT, hbaseZookeeperClientPort); + ToolRunner.run(conf, new WordAggregationHBase(), args); + } +}
