HBASE-13992 Integrate SparkOnHBase into HBase Signed-off-by: Sean Busbey <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/30f7d127 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/30f7d127 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/30f7d127 Branch: refs/heads/master Commit: 30f7d127c3974cff9e3058e13d7c50805ee4482f Parents: 6b9b7cb Author: Ted Malaska <[email protected]> Authored: Tue Jul 28 11:10:37 2015 -0500 Committer: Sean Busbey <[email protected]> Committed: Tue Jul 28 11:45:23 2015 -0500 ---------------------------------------------------------------------- dev-support/test-patch.properties | 3 +- hbase-spark/pom.xml | 572 +++++++++++++++++++ .../JavaHBaseBulkDeleteExample.java | 80 +++ .../hbasecontext/JavaHBaseBulkGetExample.java | 115 ++++ .../hbasecontext/JavaHBaseBulkPutExample.java | 90 +++ .../hbasecontext/JavaHBaseDistributedScan.java | 81 +++ .../hbasecontext/JavaHBaseMapGetPutExample.java | 105 ++++ .../JavaHBaseStreamingBulkPutExample.java | 90 +++ .../hadoop/hbase/spark/HBaseContext.scala | 570 ++++++++++++++++++ .../hbase/spark/HBaseDStreamFunctions.scala | 158 +++++ .../hadoop/hbase/spark/HBaseRDDFunctions.scala | 162 ++++++ .../hadoop/hbase/spark/JavaHBaseContext.scala | 347 +++++++++++ .../hbasecontext/HBaseBulkDeleteExample.scala | 63 ++ .../hbasecontext/HBaseBulkGetExample.scala | 93 +++ .../hbasecontext/HBaseBulkPutExample.scala | 75 +++ .../HBaseBulkPutExampleFromFile.scala | 76 +++ .../HBaseBulkPutTimestampExample.scala | 77 +++ .../HBaseDistributedScanExample.scala | 61 ++ .../HBaseStreamingBulkPutExample.scala | 74 +++ .../example/rdd/HBaseBulkDeleteExample.scala | 64 +++ .../spark/example/rdd/HBaseBulkGetExample.scala | 88 +++ .../spark/example/rdd/HBaseBulkPutExample.scala | 76 +++ .../rdd/HBaseForeachPartitionExample.scala | 83 +++ .../example/rdd/HBaseMapPartitionExample.scala | 89 +++ .../hbase/spark/JavaHBaseContextSuite.java | 334 +++++++++++ .../hadoop/hbase/spark/HBaseContextSuite.scala | 344 +++++++++++ .../spark/HBaseDStreamFunctionsSuite.scala | 129 +++++ .../hbase/spark/HBaseRDDFunctionsSuite.scala | 398 +++++++++++++ pom.xml | 1 + 29 files changed, 4497 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/dev-support/test-patch.properties ---------------------------------------------------------------------- diff --git a/dev-support/test-patch.properties b/dev-support/test-patch.properties index c652e3f..7e75965 100644 --- a/dev-support/test-patch.properties +++ b/dev-support/test-patch.properties @@ -21,7 +21,8 @@ MAVEN_OPTS="${MAVEN_OPTS:-"-Xmx3100M"}" OK_RELEASEAUDIT_WARNINGS=0 # Allow four warnings. Javadoc complains about sun.misc.Unsafe use. # See HBASE-7457, HBASE-13761 -OK_JAVADOC_WARNINGS=7 +# Allow 2 additional warnings for Scala stub notice about MR. See HBASE-13992 +OK_JAVADOC_WARNINGS=9 MAX_LINE_LENGTH=100 http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/pom.xml ---------------------------------------------------------------------- diff --git a/hbase-spark/pom.xml b/hbase-spark/pom.xml new file mode 100644 index 0000000..e48f9e8 --- /dev/null +++ b/hbase-spark/pom.xml @@ -0,0 +1,572 @@ +<?xml version="1.0"?> + +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation= + "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>hbase</artifactId> + <groupId>org.apache.hbase</groupId> + <version>2.0.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + <artifactId>hbase-spark</artifactId> + <name>Apache HBase - Spark</name> + + <properties> + <spark.version>1.3.0</spark.version> + <scala.version>2.10.4</scala.version> + <scala.binary.version>2.10</scala.binary.version> + <top.dir>${project.basedir}/..</top.dir> + </properties> + + <dependencies> + <!-- Force import of Spark's servlet API for unit tests --> + <dependency> + <groupId>javax.servlet</groupId> + <artifactId>javax.servlet-api</artifactId> + <version>3.0.1</version> + <scope>test</scope> + </dependency> + + <!-- Mark Spark / Scala as provided --> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>${scala.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <!-- make sure wrong scala version is not pulled in --> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + </exclusion> + <exclusion> + <!-- make sure wrong scala version is not pulled in --> + <groupId>org.scala-lang</groupId> + <artifactId>scalap</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <type>test-jar</type> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.binary.version}</artifactId> + <version>2.2.4</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.scalamock</groupId> + <artifactId>scalamock-scalatest-support_${scala.binary.version}</artifactId> + <version>3.1.4</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <version>${hadoop-two.version}</version> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + <exclusion> + <groupId>javax.servlet.jsp</groupId> + <artifactId>jsp-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.jruby</groupId> + <artifactId>jruby-complete</artifactId> + </exclusion> + <exclusion> + <groupId>org.jboss.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop-two.version}</version> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + <exclusion> + <groupId>javax.servlet.jsp</groupId> + <artifactId>jsp-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.jruby</groupId> + <artifactId>jruby-complete</artifactId> + </exclusion> + <exclusion> + <groupId>org.jboss.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop-two.version}</version> + <type>test-jar</type> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + <exclusion> + <groupId>javax.servlet.jsp</groupId> + <artifactId>jsp-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.jruby</groupId> + <artifactId>jruby-complete</artifactId> + </exclusion> + <exclusion> + <groupId>org.jboss.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>${hadoop-two.version}</version> + <type>test-jar</type> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + <exclusion> + <groupId>javax.servlet.jsp</groupId> + <artifactId>jsp-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.jruby</groupId> + <artifactId>jruby-complete</artifactId> + </exclusion> + <exclusion> + <groupId>org.jboss.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.thrift</groupId> + <artifactId>thrift</artifactId> + </exclusion> + <exclusion> + <groupId>org.jruby</groupId> + <artifactId>jruby-complete</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-2.1</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-api-2.1</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>servlet-api-2.5</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-core</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-json</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-server</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty-util</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-runtime</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-compiler</artifactId> + </exclusion> + <exclusion> + <groupId>org.jruby</groupId> + <artifactId>jruby-complete</artifactId> + </exclusion> + <exclusion> + <groupId>org.jboss.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-hadoop-compat</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.thrift</groupId> + <artifactId>thrift</artifactId> + </exclusion> + <exclusion> + <groupId>org.jruby</groupId> + <artifactId>jruby-complete</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-2.1</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-api-2.1</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>servlet-api-2.5</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-core</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-json</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-server</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty-util</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-runtime</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-compiler</artifactId> + </exclusion> + <exclusion> + <groupId>org.jruby</groupId> + <artifactId>jruby-complete</artifactId> + </exclusion> + <exclusion> + <groupId>org.jboss.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-hadoop2-compat</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.thrift</groupId> + <artifactId>thrift</artifactId> + </exclusion> + <exclusion> + <groupId>org.jruby</groupId> + <artifactId>jruby-complete</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-2.1</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-api-2.1</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>servlet-api-2.5</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-core</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-json</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-server</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty-util</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-runtime</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-compiler</artifactId> + </exclusion> + <exclusion> + <groupId>org.jruby</groupId> + <artifactId>jruby-complete</artifactId> + </exclusion> + <exclusion> + <groupId>org.jboss.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-server</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-server</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-it</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + </dependencies> + + + <build> + <testSourceDirectory>src/test/scala</testSourceDirectory> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + </plugin> + + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <version>3.2.0</version> + <configuration> + <charset>${project.build.sourceEncoding}</charset> + <scalaVersion>${scala.version}</scalaVersion> + </configuration> + <executions> + <execution> + <id>scala-compile-first</id> + <phase>process-resources</phase> + <goals> + <goal>add-source</goal> + <goal>compile</goal> + </goals> + </execution> + <execution> + <id>scala-test-compile</id> + <phase>process-test-resources</phase> + <goals> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.scalatest</groupId> + <artifactId>scalatest-maven-plugin</artifactId> + <version>1.0</version> + <configuration> + <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> + <junitxml>.</junitxml> + <filereports>WDF TestSuite.txt</filereports> + <parallel>false</parallel> + </configuration> + <executions> + <execution> + <id>test</id> + <phase>test</phase> + <goals> + <goal>test</goal> + </goals> + <configuration> + <skipTests>true</skipTests> + </configuration> + </execution> + <execution> + <id>integration-test</id> + <phase>integration-test</phase> + <goals> + <goal>test</goal> + </goals> + <configuration> + <tagsToExclude>Integration-Test</tagsToExclude> + <argLine> + -Xmx1536m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m + </argLine> + <parallel>false</parallel> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkDeleteExample.java ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkDeleteExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkDeleteExample.java new file mode 100644 index 0000000..68b2edd --- /dev/null +++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkDeleteExample.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark.example.hbasecontext; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.spark.JavaHBaseContext; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; + +import java.util.ArrayList; +import java.util.List; + +/** + * This is a simple example of deleting records in HBase + * with the bulkDelete function. + */ +final public class JavaHBaseBulkDeleteExample { + + private JavaHBaseBulkDeleteExample() {} + + public static void main(String[] args) { + if (args.length < 1) { + System.out.println("JavaHBaseBulkDeleteExample {tableName}"); + return; + } + + String tableName = args[0]; + + SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkDeleteExample " + tableName); + JavaSparkContext jsc = new JavaSparkContext(sparkConf); + + try { + List<byte[]> list = new ArrayList<>(); + list.add(Bytes.toBytes("1")); + list.add(Bytes.toBytes("2")); + list.add(Bytes.toBytes("3")); + list.add(Bytes.toBytes("4")); + list.add(Bytes.toBytes("5")); + + JavaRDD<byte[]> rdd = jsc.parallelize(list); + + Configuration conf = HBaseConfiguration.create(); + + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); + + hbaseContext.bulkDelete(rdd, + TableName.valueOf(tableName), new DeleteFunction(), 4); + } finally { + jsc.stop(); + } + + } + + public static class DeleteFunction implements Function<byte[], Delete> { + private static final long serialVersionUID = 1L; + public Delete call(byte[] v) throws Exception { + return new Delete(v); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkGetExample.java ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkGetExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkGetExample.java new file mode 100644 index 0000000..c7dcbb6 --- /dev/null +++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkGetExample.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark.example.hbasecontext; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.spark.JavaHBaseContext; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; + +/** + * This is a simple example of getting records in HBase + * with the bulkGet function. + */ +final public class JavaHBaseBulkGetExample { + + private JavaHBaseBulkGetExample() {} + + public static void main(String[] args) { + if (args.length < 1) { + System.out.println("JavaHBaseBulkGetExample {tableName}"); + return; + } + + String tableName = args[0]; + + SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkGetExample " + tableName); + JavaSparkContext jsc = new JavaSparkContext(sparkConf); + + try { + List<byte[]> list = new ArrayList<>(); + list.add(Bytes.toBytes("1")); + list.add(Bytes.toBytes("2")); + list.add(Bytes.toBytes("3")); + list.add(Bytes.toBytes("4")); + list.add(Bytes.toBytes("5")); + + JavaRDD<byte[]> rdd = jsc.parallelize(list); + + Configuration conf = HBaseConfiguration.create(); + + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); + + hbaseContext.bulkGet(TableName.valueOf(tableName), 2, rdd, new GetFunction(), + new ResultFunction()); + } finally { + jsc.stop(); + } + } + + public static class GetFunction implements Function<byte[], Get> { + + private static final long serialVersionUID = 1L; + + public Get call(byte[] v) throws Exception { + return new Get(v); + } + } + + public static class ResultFunction implements Function<Result, String> { + + private static final long serialVersionUID = 1L; + + public String call(Result result) throws Exception { + Iterator<Cell> it = result.listCells().iterator(); + StringBuilder b = new StringBuilder(); + + b.append(Bytes.toString(result.getRow())).append(":"); + + while (it.hasNext()) { + Cell cell = it.next(); + String q = Bytes.toString(cell.getQualifierArray()); + if (q.equals("counter")) { + b.append("(") + .append(Bytes.toString(cell.getQualifierArray())) + .append(",") + .append(Bytes.toLong(cell.getValueArray())) + .append(")"); + } else { + b.append("(") + .append(Bytes.toString(cell.getQualifierArray())) + .append(",") + .append(Bytes.toString(cell.getValueArray())) + .append(")"); + } + } + return b.toString(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkPutExample.java ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkPutExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkPutExample.java new file mode 100644 index 0000000..ded5081 --- /dev/null +++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkPutExample.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark.example.hbasecontext; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.spark.JavaHBaseContext; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; + +/** + * This is a simple example of putting records in HBase + * with the bulkPut function. + */ +final public class JavaHBaseBulkPutExample { + + private JavaHBaseBulkPutExample() {} + + public static void main(String[] args) { + if (args.length < 2) { + System.out.println("JavaHBaseBulkPutExample " + + "{tableName} {columnFamily}"); + return; + } + + String tableName = args[0]; + String columnFamily = args[1]; + + SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkPutExample " + tableName); + JavaSparkContext jsc = new JavaSparkContext(sparkConf); + + try { + List<String> list = new ArrayList<>(); + list.add("1," + columnFamily + ",a,1"); + list.add("2," + columnFamily + ",a,2"); + list.add("3," + columnFamily + ",a,3"); + list.add("4," + columnFamily + ",a,4"); + list.add("5," + columnFamily + ",a,5"); + + JavaRDD<String> rdd = jsc.parallelize(list); + + Configuration conf = HBaseConfiguration.create(); + + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); + + hbaseContext.bulkPut(rdd, + TableName.valueOf(tableName), + new PutFunction()); + } finally { + jsc.stop(); + } + } + + public static class PutFunction implements Function<String, Put> { + + private static final long serialVersionUID = 1L; + + public Put call(String v) throws Exception { + String[] cells = v.split(","); + Put put = new Put(Bytes.toBytes(cells[0])); + + put.addColumn(Bytes.toBytes(cells[1]), Bytes.toBytes(cells[2]), + Bytes.toBytes(cells[3])); + return put; + } + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseDistributedScan.java ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseDistributedScan.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseDistributedScan.java new file mode 100644 index 0000000..6192ad9 --- /dev/null +++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseDistributedScan.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark.example.hbasecontext; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.spark.JavaHBaseContext; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; + +import org.apache.spark.api.java.function.Function; +import scala.Tuple2; + +/** + * This is a simple example of scanning records from HBase + * with the hbaseRDD function. + */ +final public class JavaHBaseDistributedScan { + + private JavaHBaseDistributedScan() {} + + public static void main(String[] args) { + if (args.length < 1) { + System.out.println("JavaHBaseDistributedScan {tableName}"); + return; + } + + String tableName = args[0]; + + SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseDistributedScan " + tableName); + JavaSparkContext jsc = new JavaSparkContext(sparkConf); + + try { + Configuration conf = HBaseConfiguration.create(); + + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); + + Scan scan = new Scan(); + scan.setCaching(100); + + JavaRDD<Tuple2<ImmutableBytesWritable, Result>> javaRdd = + hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan); + + List<String> results = javaRdd.map(new ScanConvertFunction()).collect(); + + System.out.println("Result Size: " + results.size()); + } finally { + jsc.stop(); + } + } + + private static class ScanConvertFunction implements + Function<Tuple2<ImmutableBytesWritable, Result>, String> { + @Override + public String call(Tuple2<ImmutableBytesWritable, Result> v1) throws Exception { + return Bytes.toString(v1._1().copyBytes()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseMapGetPutExample.java ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseMapGetPutExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseMapGetPutExample.java new file mode 100644 index 0000000..0d41a70 --- /dev/null +++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseMapGetPutExample.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark.example.hbasecontext; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.spark.JavaHBaseContext; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.VoidFunction; + +import scala.Tuple2; + +/** + * This is a simple example of using the foreachPartition + * method with a HBase connection + */ +final public class JavaHBaseMapGetPutExample { + + private JavaHBaseMapGetPutExample() {} + + public static void main(String[] args) { + if (args.length < 1) { + System.out.println("JavaHBaseBulkGetExample {tableName}"); + return; + } + + final String tableName = args[0]; + + SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkGetExample " + tableName); + JavaSparkContext jsc = new JavaSparkContext(sparkConf); + + try { + List<byte[]> list = new ArrayList<>(); + list.add(Bytes.toBytes("1")); + list.add(Bytes.toBytes("2")); + list.add(Bytes.toBytes("3")); + list.add(Bytes.toBytes("4")); + list.add(Bytes.toBytes("5")); + + JavaRDD<byte[]> rdd = jsc.parallelize(list); + Configuration conf = HBaseConfiguration.create(); + + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); + + hbaseContext.foreachPartition(rdd, + new VoidFunction<Tuple2<Iterator<byte[]>, Connection>>() { + public void call(Tuple2<Iterator<byte[]>, Connection> t) + throws Exception { + Table table = t._2().getTable(TableName.valueOf(tableName)); + BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName)); + + while (t._1().hasNext()) { + byte[] b = t._1().next(); + Result r = table.get(new Get(b)); + if (r.getExists()) { + mutator.mutate(new Put(b)); + } + } + + mutator.flush(); + mutator.close(); + table.close(); + } + }); + } finally { + jsc.stop(); + } + } + + public static class GetFunction implements Function<byte[], Get> { + private static final long serialVersionUID = 1L; + public Get call(byte[] v) throws Exception { + return new Get(v); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseStreamingBulkPutExample.java ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseStreamingBulkPutExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseStreamingBulkPutExample.java new file mode 100644 index 0000000..cd4cf24 --- /dev/null +++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseStreamingBulkPutExample.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark.example.hbasecontext; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.spark.JavaHBaseContext; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; + +/** + * This is a simple example of BulkPut with Spark Streaming + */ +final public class JavaHBaseStreamingBulkPutExample { + + private JavaHBaseStreamingBulkPutExample() {} + + public static void main(String[] args) { + if (args.length < 4) { + System.out.println("JavaHBaseBulkPutExample " + + "{host} {port} {tableName}"); + return; + } + + String host = args[0]; + String port = args[1]; + String tableName = args[2]; + + SparkConf sparkConf = + new SparkConf().setAppName("JavaHBaseStreamingBulkPutExample " + + tableName + ":" + port + ":" + tableName); + + JavaSparkContext jsc = new JavaSparkContext(sparkConf); + + try { + JavaStreamingContext jssc = + new JavaStreamingContext(jsc, new Duration(1000)); + + JavaReceiverInputDStream<String> javaDstream = + jssc.socketTextStream(host, Integer.parseInt(port)); + + Configuration conf = HBaseConfiguration.create(); + + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); + + hbaseContext.streamBulkPut(javaDstream, + TableName.valueOf(tableName), + new PutFunction()); + } finally { + jsc.stop(); + } + } + + public static class PutFunction implements Function<String, Put> { + + private static final long serialVersionUID = 1L; + + public Put call(String v) throws Exception { + String[] part = v.split(","); + Put put = new Put(Bytes.toBytes(part[0])); + + put.addColumn(Bytes.toBytes(part[1]), + Bytes.toBytes(part[2]), + Bytes.toBytes(part[3])); + return put; + } + + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala new file mode 100644 index 0000000..f060fea --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala @@ -0,0 +1,570 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark + +import org.apache.hadoop.hbase.TableName +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.rdd.RDD +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.client.ConnectionFactory +import org.apache.hadoop.hbase.client.Scan +import org.apache.hadoop.hbase.client.Get +import org.apache.hadoop.hbase.client.Result +import scala.reflect.ClassTag +import org.apache.hadoop.hbase.client.Connection +import org.apache.hadoop.hbase.client.Put +import org.apache.hadoop.hbase.client.Delete +import org.apache.spark.{Logging, SerializableWritable, SparkContext} +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.mapreduce.Job +import org.apache.hadoop.hbase.client.Mutation +import org.apache.spark.streaming.dstream.DStream +import java.io._ +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod +import org.apache.hadoop.hbase.mapreduce.TableInputFormat +import org.apache.hadoop.hbase.mapreduce.IdentityTableMapper +import org.apache.hadoop.fs.{Path, FileSystem} + +/** + * HBaseContext is a façade for HBase operations + * like bulk put, get, increment, delete, and scan + * + * HBaseContext will take the responsibilities + * of disseminating the configuration information + * to the working and managing the life cycle of HConnections. + */ +class HBaseContext(@transient sc: SparkContext, + @transient config: Configuration, + val tmpHdfsConfgFile: String = null) + extends Serializable with Logging { + + @transient var credentials = SparkHadoopUtil.get.getCurrentUserCredentials() + @transient var tmpHdfsConfiguration:Configuration = config + @transient var appliedCredentials = false + @transient val job = Job.getInstance(config) + TableMapReduceUtil.initCredentials(job) + val broadcastedConf = sc.broadcast(new SerializableWritable(config)) + val credentialsConf = sc.broadcast(new SerializableWritable(job.getCredentials)) + + if (tmpHdfsConfgFile != null && config != null) { + val fs = FileSystem.newInstance(config) + val tmpPath = new Path(tmpHdfsConfgFile) + if (!fs.exists(tmpPath)) { + val outputStream = fs.create(tmpPath) + config.write(outputStream) + outputStream.close() + } else { + logWarning("tmpHdfsConfigDir " + tmpHdfsConfgFile + " exist!!") + } + } + + /** + * A simple enrichment of the traditional Spark RDD foreachPartition. + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * @param rdd Original RDD with data to iterate over + * @param f Function to be given a iterator to iterate through + * the RDD values and a HConnection object to interact + * with HBase + */ + def foreachPartition[T](rdd: RDD[T], + f: (Iterator[T], Connection) => Unit):Unit = { + rdd.foreachPartition( + it => hbaseForeachPartition(broadcastedConf, it, f)) + } + + /** + * A simple enrichment of the traditional Spark Streaming dStream foreach + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * @param dstream Original DStream with data to iterate over + * @param f Function to be given a iterator to iterate through + * the DStream values and a HConnection object to + * interact with HBase + */ + def foreachPartition[T](dstream: DStream[T], + f: (Iterator[T], Connection) => Unit):Unit = { + dstream.foreachRDD((rdd, time) => { + foreachPartition(rdd, f) + }) + } + + /** + * A simple enrichment of the traditional Spark RDD mapPartition. + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * @param rdd Original RDD with data to iterate over + * @param mp Function to be given a iterator to iterate through + * the RDD values and a HConnection object to interact + * with HBase + * @return Returns a new RDD generated by the user definition + * function just like normal mapPartition + */ + def mapPartitions[T, R: ClassTag](rdd: RDD[T], + mp: (Iterator[T], Connection) => Iterator[R]): RDD[R] = { + + rdd.mapPartitions[R](it => hbaseMapPartition[T, R](broadcastedConf, + it, + mp)) + + } + + /** + * A simple enrichment of the traditional Spark Streaming DStream + * foreachPartition. + * + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * Note: Make sure to partition correctly to avoid memory issue when + * getting data from HBase + * + * @param dstream Original DStream with data to iterate over + * @param f Function to be given a iterator to iterate through + * the DStream values and a HConnection object to + * interact with HBase + * @return Returns a new DStream generated by the user + * definition function just like normal mapPartition + */ + def streamForeachPartition[T](dstream: DStream[T], + f: (Iterator[T], Connection) => Unit): Unit = { + + dstream.foreachRDD(rdd => this.foreachPartition(rdd, f)) + } + + /** + * A simple enrichment of the traditional Spark Streaming DStream + * mapPartition. + * + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * Note: Make sure to partition correctly to avoid memory issue when + * getting data from HBase + * + * @param dstream Original DStream with data to iterate over + * @param f Function to be given a iterator to iterate through + * the DStream values and a HConnection object to + * interact with HBase + * @return Returns a new DStream generated by the user + * definition function just like normal mapPartition + */ + def streamMapPartitions[T, U: ClassTag](dstream: DStream[T], + f: (Iterator[T], Connection) => Iterator[U]): + DStream[U] = { + dstream.mapPartitions(it => hbaseMapPartition[T, U]( + broadcastedConf, + it, + f)) + } + + /** + * A simple abstraction over the HBaseContext.foreachPartition method. + * + * It allow addition support for a user to take RDD + * and generate puts and send them to HBase. + * The complexity of managing the HConnection is + * removed from the developer + * + * @param rdd Original RDD with data to iterate over + * @param tableName The name of the table to put into + * @param f Function to convert a value in the RDD to a HBase Put + */ + def bulkPut[T](rdd: RDD[T], tableName: TableName, f: (T) => Put) { + + val tName = tableName.getName + rdd.foreachPartition( + it => hbaseForeachPartition[T]( + broadcastedConf, + it, + (iterator, connection) => { + val m = connection.getBufferedMutator(TableName.valueOf(tName)) + iterator.foreach(T => m.mutate(f(T))) + m.flush() + m.close() + })) + } + + def applyCreds[T] (configBroadcast: Broadcast[SerializableWritable[Configuration]]){ + credentials = SparkHadoopUtil.get.getCurrentUserCredentials() + + logDebug("appliedCredentials:" + appliedCredentials + ",credentials:" + credentials) + + if (!appliedCredentials && credentials != null) { + appliedCredentials = true + + @transient val ugi = UserGroupInformation.getCurrentUser + ugi.addCredentials(credentials) + // specify that this is a proxy user + ugi.setAuthenticationMethod(AuthenticationMethod.PROXY) + + ugi.addCredentials(credentialsConf.value.value) + } + } + + /** + * A simple abstraction over the HBaseContext.streamMapPartition method. + * + * It allow addition support for a user to take a DStream and + * generate puts and send them to HBase. + * + * The complexity of managing the HConnection is + * removed from the developer + * + * @param dstream Original DStream with data to iterate over + * @param tableName The name of the table to put into + * @param f Function to convert a value in + * the DStream to a HBase Put + */ + def streamBulkPut[T](dstream: DStream[T], + tableName: TableName, + f: (T) => Put) = { + val tName = tableName.getName + dstream.foreachRDD((rdd, time) => { + bulkPut(rdd, TableName.valueOf(tName), f) + }) + } + + /** + * A simple abstraction over the HBaseContext.foreachPartition method. + * + * It allow addition support for a user to take a RDD and generate delete + * and send them to HBase. The complexity of managing the HConnection is + * removed from the developer + * + * @param rdd Original RDD with data to iterate over + * @param tableName The name of the table to delete from + * @param f Function to convert a value in the RDD to a + * HBase Deletes + * @param batchSize The number of delete to batch before sending to HBase + */ + def bulkDelete[T](rdd: RDD[T], tableName: TableName, + f: (T) => Delete, batchSize: Integer) { + bulkMutation(rdd, tableName, f, batchSize) + } + + /** + * A simple abstraction over the HBaseContext.streamBulkMutation method. + * + * It allow addition support for a user to take a DStream and + * generate Delete and send them to HBase. + * + * The complexity of managing the HConnection is + * removed from the developer + * + * @param dstream Original DStream with data to iterate over + * @param tableName The name of the table to delete from + * @param f function to convert a value in the DStream to a + * HBase Delete + * @param batchSize The number of deletes to batch before sending to HBase + */ + def streamBulkDelete[T](dstream: DStream[T], + tableName: TableName, + f: (T) => Delete, + batchSize: Integer) = { + streamBulkMutation(dstream, tableName, f, batchSize) + } + + /** + * Under lining function to support all bulk mutations + * + * May be opened up if requested + */ + private def bulkMutation[T](rdd: RDD[T], tableName: TableName, + f: (T) => Mutation, batchSize: Integer) { + + val tName = tableName.getName + rdd.foreachPartition( + it => hbaseForeachPartition[T]( + broadcastedConf, + it, + (iterator, connection) => { + val table = connection.getTable(TableName.valueOf(tName)) + val mutationList = new java.util.ArrayList[Mutation] + iterator.foreach(T => { + mutationList.add(f(T)) + if (mutationList.size >= batchSize) { + table.batch(mutationList, null) + mutationList.clear() + } + }) + if (mutationList.size() > 0) { + table.batch(mutationList, null) + mutationList.clear() + } + table.close() + })) + } + + /** + * Under lining function to support all bulk streaming mutations + * + * May be opened up if requested + */ + private def streamBulkMutation[T](dstream: DStream[T], + tableName: TableName, + f: (T) => Mutation, + batchSize: Integer) = { + val tName = tableName.getName + dstream.foreachRDD((rdd, time) => { + bulkMutation(rdd, TableName.valueOf(tName), f, batchSize) + }) + } + + /** + * A simple abstraction over the HBaseContext.mapPartition method. + * + * It allow addition support for a user to take a RDD and generates a + * new RDD based on Gets and the results they bring back from HBase + * + * @param rdd Original RDD with data to iterate over + * @param tableName The name of the table to get from + * @param makeGet function to convert a value in the RDD to a + * HBase Get + * @param convertResult This will convert the HBase Result object to + * what ever the user wants to put in the resulting + * RDD + * return new RDD that is created by the Get to HBase + */ + def bulkGet[T, U: ClassTag](tableName: TableName, + batchSize: Integer, + rdd: RDD[T], + makeGet: (T) => Get, + convertResult: (Result) => U): RDD[U] = { + + val getMapPartition = new GetMapPartition(tableName, + batchSize, + makeGet, + convertResult) + + rdd.mapPartitions[U](it => + hbaseMapPartition[T, U]( + broadcastedConf, + it, + getMapPartition.run)) + } + + /** + * A simple abstraction over the HBaseContext.streamMap method. + * + * It allow addition support for a user to take a DStream and + * generates a new DStream based on Gets and the results + * they bring back from HBase + * + * @param tableName The name of the table to get from + * @param batchSize The number of Gets to be sent in a single batch + * @param dStream Original DStream with data to iterate over + * @param makeGet Function to convert a value in the DStream to a + * HBase Get + * @param convertResult This will convert the HBase Result object to + * what ever the user wants to put in the resulting + * DStream + * @return A new DStream that is created by the Get to HBase + */ + def streamBulkGet[T, U: ClassTag](tableName: TableName, + batchSize: Integer, + dStream: DStream[T], + makeGet: (T) => Get, + convertResult: (Result) => U): DStream[U] = { + + val getMapPartition = new GetMapPartition(tableName, + batchSize, + makeGet, + convertResult) + + dStream.mapPartitions[U](it => hbaseMapPartition[T, U]( + broadcastedConf, + it, + getMapPartition.run)) + } + + /** + * This function will use the native HBase TableInputFormat with the + * given scan object to generate a new RDD + * + * @param tableName the name of the table to scan + * @param scan the HBase scan object to use to read data from HBase + * @param f function to convert a Result object from HBase into + * what the user wants in the final generated RDD + * @return new RDD with results from scan + */ + def hbaseRDD[U: ClassTag](tableName: TableName, scan: Scan, + f: ((ImmutableBytesWritable, Result)) => U): RDD[U] = { + + val job: Job = Job.getInstance(getConf(broadcastedConf)) + + TableMapReduceUtil.initCredentials(job) + TableMapReduceUtil.initTableMapperJob(tableName, scan, + classOf[IdentityTableMapper], null, null, job) + + sc.newAPIHadoopRDD(job.getConfiguration, + classOf[TableInputFormat], + classOf[ImmutableBytesWritable], + classOf[Result]).map(f) + } + + /** + * A overloaded version of HBaseContext hbaseRDD that defines the + * type of the resulting RDD + * + * @param tableName the name of the table to scan + * @param scans the HBase scan object to use to read data from HBase + * @return New RDD with results from scan + * + */ + def hbaseRDD(tableName: TableName, scans: Scan): + RDD[(ImmutableBytesWritable, Result)] = { + + hbaseRDD[(ImmutableBytesWritable, Result)]( + tableName, + scans, + (r: (ImmutableBytesWritable, Result)) => r) + } + + /** + * underlining wrapper all foreach functions in HBaseContext + */ + private def hbaseForeachPartition[T](configBroadcast: + Broadcast[SerializableWritable[Configuration]], + it: Iterator[T], + f: (Iterator[T], Connection) => Unit) = { + + val config = getConf(configBroadcast) + + applyCreds(configBroadcast) + // specify that this is a proxy user + val connection = ConnectionFactory.createConnection(config) + f(it, connection) + connection.close() + } + + private def getConf(configBroadcast: Broadcast[SerializableWritable[Configuration]]): + Configuration = { + + if (tmpHdfsConfiguration == null && tmpHdfsConfgFile != null) { + val fs = FileSystem.newInstance(SparkHadoopUtil.get.conf) + val inputStream = fs.open(new Path(tmpHdfsConfgFile)) + tmpHdfsConfiguration = new Configuration(false) + tmpHdfsConfiguration.readFields(inputStream) + inputStream.close() + } + + if (tmpHdfsConfiguration == null) { + try { + tmpHdfsConfiguration = configBroadcast.value.value + } catch { + case ex: Exception => logError("Unable to getConfig from broadcast", ex) + } + } + tmpHdfsConfiguration + } + + /** + * underlining wrapper all mapPartition functions in HBaseContext + * + */ + private def hbaseMapPartition[K, U]( + configBroadcast: + Broadcast[SerializableWritable[Configuration]], + it: Iterator[K], + mp: (Iterator[K], Connection) => + Iterator[U]): Iterator[U] = { + + val config = getConf(configBroadcast) + applyCreds(configBroadcast) + + val connection = ConnectionFactory.createConnection(config) + val res = mp(it, connection) + connection.close() + res + + } + + /** + * underlining wrapper all get mapPartition functions in HBaseContext + */ + private class GetMapPartition[T, U](tableName: TableName, + batchSize: Integer, + makeGet: (T) => Get, + convertResult: (Result) => U) + extends Serializable { + + val tName = tableName.getName + + def run(iterator: Iterator[T], connection: Connection): Iterator[U] = { + val table = connection.getTable(TableName.valueOf(tName)) + + val gets = new java.util.ArrayList[Get]() + var res = List[U]() + + while (iterator.hasNext) { + gets.add(makeGet(iterator.next())) + + if (gets.size() == batchSize) { + val results = table.get(gets) + res = res ++ results.map(convertResult) + gets.clear() + } + } + if (gets.size() > 0) { + val results = table.get(gets) + res = res ++ results.map(convertResult) + gets.clear() + } + table.close() + res.iterator + } + } + + /** + * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef]. + * + * This method is used to keep ClassTags out of the external Java API, as + * the Java compiler cannot produce them automatically. While this + * ClassTag-faking does please the compiler, it can cause problems at runtime + * if the Scala API relies on ClassTags for correctness. + * + * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior, + * just worse performance or security issues. + * For instance, an Array of AnyRef can hold any type T, but may lose primitive + * specialization. + */ + private[spark] + def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] +} http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctions.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctions.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctions.scala new file mode 100644 index 0000000..d563a29 --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctions.scala @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark + +import org.apache.hadoop.hbase.TableName +import org.apache.hadoop.hbase.client._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.spark.streaming.dstream.DStream + +import scala.reflect.ClassTag + +/** + * HBaseDStreamFunctions contains a set of implicit functions that can be + * applied to a Spark DStream so that we can easily interact with HBase + */ +object HBaseDStreamFunctions { + + /** + * These are implicit methods for a DStream that contains any type of + * data. + * + * @param dStream This is for dStreams of any type + * @tparam T Type T + */ + implicit class GenericHBaseDStreamFunctions[T](val dStream: DStream[T]) { + + /** + * Implicit method that gives easy access to HBaseContext's bulk + * put. This will not return a new Stream. Think of it like a foreach + * + * @param hc The hbaseContext object to identify which + * HBase cluster connection to use + * @param tableName The tableName that the put will be sent to + * @param f The function that will turn the DStream values + * into HBase Put objects. + */ + def hbaseBulkPut(hc: HBaseContext, + tableName: TableName, + f: (T) => Put): Unit = { + hc.streamBulkPut(dStream, tableName, f) + } + + /** + * Implicit method that gives easy access to HBaseContext's bulk + * get. This will return a new DStream. Think about it as a DStream map + * function. In that every DStream value will get a new value out of + * HBase. That new value will populate the newly generated DStream. + * + * @param hc The hbaseContext object to identify which + * HBase cluster connection to use + * @param tableName The tableName that the put will be sent to + * @param batchSize How many gets to execute in a single batch + * @param f The function that will turn the RDD values + * in HBase Get objects + * @param convertResult The function that will convert a HBase + * Result object into a value that will go + * into the resulting DStream + * @tparam R The type of Object that will be coming + * out of the resulting DStream + * @return A resulting DStream with type R objects + */ + def hbaseBulkGet[R: ClassTag](hc: HBaseContext, + tableName: TableName, + batchSize:Int, f: (T) => Get, convertResult: (Result) => R): + DStream[R] = { + hc.streamBulkGet[T, R](tableName, batchSize, dStream, f, convertResult) + } + + /** + * Implicit method that gives easy access to HBaseContext's bulk + * get. This will return a new DStream. Think about it as a DStream map + * function. In that every DStream value will get a new value out of + * HBase. That new value will populate the newly generated DStream. + * + * @param hc The hbaseContext object to identify which + * HBase cluster connection to use + * @param tableName The tableName that the put will be sent to + * @param batchSize How many gets to execute in a single batch + * @param f The function that will turn the RDD values + * in HBase Get objects + * @return A resulting DStream with type R objects + */ + def hbaseBulkGet(hc: HBaseContext, + tableName: TableName, batchSize:Int, + f: (T) => Get): DStream[(ImmutableBytesWritable, Result)] = { + hc.streamBulkGet[T, (ImmutableBytesWritable, Result)]( + tableName, batchSize, dStream, f, + result => (new ImmutableBytesWritable(result.getRow), result)) + } + + /** + * Implicit method that gives easy access to HBaseContext's bulk + * Delete. This will not return a new DStream. + * + * @param hc The hbaseContext object to identify which HBase + * cluster connection to use + * @param tableName The tableName that the deletes will be sent to + * @param f The function that will convert the DStream value into + * a HBase Delete Object + * @param batchSize The number of Deletes to be sent in a single batch + */ + def hbaseBulkDelete(hc: HBaseContext, + tableName: TableName, + f:(T) => Delete, batchSize:Int): Unit = { + hc.streamBulkDelete(dStream, tableName, f, batchSize) + } + + /** + * Implicit method that gives easy access to HBaseContext's + * foreachPartition method. This will ack very much like a normal DStream + * foreach method but for the fact that you will now have a HBase connection + * while iterating through the values. + * + * @param hc The hbaseContext object to identify which HBase + * cluster connection to use + * @param f This function will get an iterator for a Partition of an + * DStream along with a connection object to HBase + */ + def hbaseForeachPartition(hc: HBaseContext, + f: (Iterator[T], Connection) => Unit): Unit = { + hc.streamForeachPartition(dStream, f) + } + + /** + * Implicit method that gives easy access to HBaseContext's + * mapPartitions method. This will ask very much like a normal DStream + * map partitions method but for the fact that you will now have a + * HBase connection while iterating through the values + * + * @param hc The hbaseContext object to identify which HBase + * cluster connection to use + * @param f This function will get an iterator for a Partition of an + * DStream along with a connection object to HBase + * @tparam R This is the type of objects that will go into the resulting + * DStream + * @return A resulting DStream of type R + */ + def hbaseMapPartitions[R: ClassTag](hc: HBaseContext, + f: (Iterator[T], Connection) => Iterator[R]): + DStream[R] = { + hc.streamMapPartitions(dStream, f) + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala new file mode 100644 index 0000000..fb8456d --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark + +import org.apache.hadoop.hbase.TableName +import org.apache.hadoop.hbase.client._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.spark.rdd.RDD + +import scala.reflect.ClassTag + +/** + * HBaseRDDFunctions contains a set of implicit functions that can be + * applied to a Spark RDD so that we can easily interact with HBase + */ +object HBaseRDDFunctions +{ + + /** + * These are implicit methods for a RDD that contains any type of + * data. + * + * @param rdd This is for rdd of any type + * @tparam T This is any type + */ + implicit class GenericHBaseRDDFunctions[T](val rdd: RDD[T]) { + + /** + * Implicit method that gives easy access to HBaseContext's bulk + * put. This will not return a new RDD. Think of it like a foreach + * + * @param hc The hbaseContext object to identify which + * HBase cluster connection to use + * @param tableName The tableName that the put will be sent to + * @param f The function that will turn the RDD values + * into HBase Put objects. + */ + def hbaseBulkPut(hc: HBaseContext, + tableName: TableName, + f: (T) => Put): Unit = { + hc.bulkPut(rdd, tableName, f) + } + + /** + * Implicit method that gives easy access to HBaseContext's bulk + * get. This will return a new RDD. Think about it as a RDD map + * function. In that every RDD value will get a new value out of + * HBase. That new value will populate the newly generated RDD. + * + * @param hc The hbaseContext object to identify which + * HBase cluster connection to use + * @param tableName The tableName that the put will be sent to + * @param batchSize How many gets to execute in a single batch + * @param f The function that will turn the RDD values + * in HBase Get objects + * @param convertResult The function that will convert a HBase + * Result object into a value that will go + * into the resulting RDD + * @tparam R The type of Object that will be coming + * out of the resulting RDD + * @return A resulting RDD with type R objects + */ + def hbaseBulkGet[R: ClassTag](hc: HBaseContext, + tableName: TableName, batchSize:Int, + f: (T) => Get, convertResult: (Result) => R): RDD[R] = { + hc.bulkGet[T, R](tableName, batchSize, rdd, f, convertResult) + } + + /** + * Implicit method that gives easy access to HBaseContext's bulk + * get. This will return a new RDD. Think about it as a RDD map + * function. In that every RDD value will get a new value out of + * HBase. That new value will populate the newly generated RDD. + * + * @param hc The hbaseContext object to identify which + * HBase cluster connection to use + * @param tableName The tableName that the put will be sent to + * @param batchSize How many gets to execute in a single batch + * @param f The function that will turn the RDD values + * in HBase Get objects + * @return A resulting RDD with type R objects + */ + def hbaseBulkGet(hc: HBaseContext, + tableName: TableName, batchSize:Int, + f: (T) => Get): RDD[(ImmutableBytesWritable, Result)] = { + hc.bulkGet[T, (ImmutableBytesWritable, Result)](tableName, + batchSize, rdd, f, + result => if (result != null && result.getRow != null) { + (new ImmutableBytesWritable(result.getRow), result) + } else { + null + }) + } + + /** + * Implicit method that gives easy access to HBaseContext's bulk + * Delete. This will not return a new RDD. + * + * @param hc The hbaseContext object to identify which HBase + * cluster connection to use + * @param tableName The tableName that the deletes will be sent to + * @param f The function that will convert the RDD value into + * a HBase Delete Object + * @param batchSize The number of Deletes to be sent in a single batch + */ + def hbaseBulkDelete(hc: HBaseContext, + tableName: TableName, f:(T) => Delete, batchSize:Int): Unit = { + hc.bulkDelete(rdd, tableName, f, batchSize) + } + + /** + * Implicit method that gives easy access to HBaseContext's + * foreachPartition method. This will ack very much like a normal RDD + * foreach method but for the fact that you will now have a HBase connection + * while iterating through the values. + * + * @param hc The hbaseContext object to identify which HBase + * cluster connection to use + * @param f This function will get an iterator for a Partition of an + * RDD along with a connection object to HBase + */ + def hbaseForeachPartition(hc: HBaseContext, + f: (Iterator[T], Connection) => Unit): Unit = { + hc.foreachPartition(rdd, f) + } + + /** + * Implicit method that gives easy access to HBaseContext's + * mapPartitions method. This will ask very much like a normal RDD + * map partitions method but for the fact that you will now have a + * HBase connection while iterating through the values + * + * @param hc The hbaseContext object to identify which HBase + * cluster connection to use + * @param f This function will get an iterator for a Partition of an + * RDD along with a connection object to HBase + * @tparam R This is the type of objects that will go into the resulting + * RDD + * @return A resulting RDD of type R + */ + def hbaseMapPartitions[R: ClassTag](hc: HBaseContext, + f: (Iterator[T], Connection) => Iterator[R]): + RDD[R] = { + hc.mapPartitions[T,R](rdd, f) + } + } +}
