Repository: hbase Updated Branches: refs/heads/master 6b9b7cb8c -> 30f7d127c
http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctionsSuite.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctionsSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctionsSuite.scala new file mode 100644 index 0000000..89148c3 --- /dev/null +++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctionsSuite.scala @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark + +import org.apache.hadoop.hbase.client._ +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.{CellUtil, TableName, HBaseTestingUtility} +import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._ +import org.apache.spark.{Logging, SparkContext} +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} + +import scala.collection.mutable + +class HBaseRDDFunctionsSuite extends FunSuite with +BeforeAndAfterEach with BeforeAndAfterAll with Logging { + @transient var sc: SparkContext = null + var TEST_UTIL: HBaseTestingUtility = new HBaseTestingUtility + + val tableName = "t1" + val columnFamily = "c" + + override def beforeAll() { + + TEST_UTIL.startMiniCluster + + logInfo(" - minicluster started") + try + TEST_UTIL.deleteTable(TableName.valueOf(tableName)) + catch { + case e: Exception => logInfo(" - no table " + tableName + " found") + + } + logInfo(" - creating table " + tableName) + TEST_UTIL.createTable(TableName.valueOf(tableName), Bytes.toBytes(columnFamily)) + logInfo(" - created table") + + sc = new SparkContext("local", "test") + } + + override def afterAll() { + TEST_UTIL.deleteTable(TableName.valueOf(tableName)) + logInfo("shuting down minicluster") + TEST_UTIL.shutdownMiniCluster() + + sc.stop() + } + + test("bulkput to test HBase client") { + val config = TEST_UTIL.getConfiguration + val rdd = sc.parallelize(Array( + (Bytes.toBytes("1"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")))), + (Bytes.toBytes("2"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("foo2")))), + (Bytes.toBytes("3"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("c"), Bytes.toBytes("foo3")))), + (Bytes.toBytes("4"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("d"), Bytes.toBytes("foo")))), + (Bytes.toBytes("5"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("e"), Bytes.toBytes("bar")))))) + + val hbaseContext = new HBaseContext(sc, config) + + rdd.hbaseBulkPut( + hbaseContext, + TableName.valueOf(tableName), + (putRecord) => { + val put = new Put(putRecord._1) + putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3)) + put + }) + + val connection = ConnectionFactory.createConnection(config) + val table = connection.getTable(TableName.valueOf("t1")) + + try { + val foo1 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("1"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")))) + assert(foo1 == "foo1") + + val foo2 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("2"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("b")))) + assert(foo2 == "foo2") + + val foo3 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("3"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("c")))) + assert(foo3 == "foo3") + + val foo4 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("4"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("d")))) + assert(foo4 == "foo") + + val foo5 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("5"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("e")))) + assert(foo5 == "bar") + } finally { + table.close() + connection.close() + } + } + + test("bulkDelete to test HBase client") { + val config = TEST_UTIL.getConfiguration + val connection = ConnectionFactory.createConnection(config) + val table = connection.getTable(TableName.valueOf("t1")) + + try { + var put = new Put(Bytes.toBytes("delete1")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")) + table.put(put) + put = new Put(Bytes.toBytes("delete2")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2")) + table.put(put) + put = new Put(Bytes.toBytes("delete3")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3")) + table.put(put) + + val rdd = sc.parallelize(Array( + Bytes.toBytes("delete1"), + Bytes.toBytes("delete3"))) + + val hbaseContext = new HBaseContext(sc, config) + + rdd.hbaseBulkDelete(hbaseContext, + TableName.valueOf(tableName), + putRecord => new Delete(putRecord), + 4) + + assert(table.get(new Get(Bytes.toBytes("delete1"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")) == null) + assert(table.get(new Get(Bytes.toBytes("delete3"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")) == null) + assert(Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("delete2"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")))).equals("foo2")) + } finally { + table.close() + connection.close() + } + + } + + test("bulkGet to test HBase client") { + val config = TEST_UTIL.getConfiguration + val connection = ConnectionFactory.createConnection(config) + val table = connection.getTable(TableName.valueOf("t1")) + + try { + var put = new Put(Bytes.toBytes("get1")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")) + table.put(put) + put = new Put(Bytes.toBytes("get2")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2")) + table.put(put) + put = new Put(Bytes.toBytes("get3")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3")) + table.put(put) + } finally { + table.close() + connection.close() + } + + val rdd = sc.parallelize(Array( + Bytes.toBytes("get1"), + Bytes.toBytes("get2"), + Bytes.toBytes("get3"), + Bytes.toBytes("get4"))) + val hbaseContext = new HBaseContext(sc, config) + + //Get with custom convert logic + val getRdd = rdd.hbaseBulkGet[String](hbaseContext, TableName.valueOf(tableName), 2, + record => { + new Get(record) + }, + (result: Result) => { + if (result.listCells() != null) { + val it = result.listCells().iterator() + val B = new StringBuilder + + B.append(Bytes.toString(result.getRow) + ":") + + while (it.hasNext) { + val cell = it.next + val q = Bytes.toString(CellUtil.cloneQualifier(cell)) + if (q.equals("counter")) { + B.append("(" + q + "," + Bytes.toLong(CellUtil.cloneValue(cell)) + ")") + } else { + B.append("(" + q + "," + Bytes.toString(CellUtil.cloneValue(cell)) + ")") + } + } + "" + B.toString + } else { + "" + } + }) + + val getArray = getRdd.collect() + + assert(getArray.length == 4) + assert(getArray.contains("get1:(a,foo1)")) + assert(getArray.contains("get2:(a,foo2)")) + assert(getArray.contains("get3:(a,foo3)")) + } + + test("bulkGet default converter to test HBase client") { + val config = TEST_UTIL.getConfiguration + val connection = ConnectionFactory.createConnection(config) + val table = connection.getTable(TableName.valueOf("t1")) + + try { + var put = new Put(Bytes.toBytes("get1")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")) + table.put(put) + put = new Put(Bytes.toBytes("get2")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2")) + table.put(put) + put = new Put(Bytes.toBytes("get3")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3")) + table.put(put) + } finally { + table.close() + connection.close() + } + + val rdd = sc.parallelize(Array( + Bytes.toBytes("get1"), + Bytes.toBytes("get2"), + Bytes.toBytes("get3"), + Bytes.toBytes("get4"))) + val hbaseContext = new HBaseContext(sc, config) + + val getRdd = rdd.hbaseBulkGet(hbaseContext, TableName.valueOf("t1"), 2, + record => { + new Get(record) + }).map((row) => { + if (row != null && row._2.listCells() != null) { + val it = row._2.listCells().iterator() + val B = new StringBuilder + + B.append(Bytes.toString(row._2.getRow) + ":") + + while (it.hasNext) { + val cell = it.next + val q = Bytes.toString(CellUtil.cloneQualifier(cell)) + if (q.equals("counter")) { + B.append("(" + q + "," + Bytes.toLong(CellUtil.cloneValue(cell)) + ")") + } else { + B.append("(" + q + "," + Bytes.toString(CellUtil.cloneValue(cell)) + ")") + } + } + "" + B.toString + } else { + "" + }}) + + val getArray = getRdd.collect() + + assert(getArray.length == 4) + assert(getArray.contains("get1:(a,foo1)")) + assert(getArray.contains("get2:(a,foo2)")) + assert(getArray.contains("get3:(a,foo3)")) + } + + test("foreachPartition with puts to test HBase client") { + val config = TEST_UTIL.getConfiguration + val rdd = sc.parallelize(Array( + (Bytes.toBytes("1foreach"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")))), + (Bytes.toBytes("2foreach"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("foo2")))), + (Bytes.toBytes("3foreach"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("c"), Bytes.toBytes("foo3")))), + (Bytes.toBytes("4foreach"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("d"), Bytes.toBytes("foo")))), + (Bytes.toBytes("5foreach"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("e"), Bytes.toBytes("bar")))))) + + val hbaseContext = new HBaseContext(sc, config) + + rdd.hbaseForeachPartition(hbaseContext, (it, conn) => { + val bufferedMutator = conn.getBufferedMutator(TableName.valueOf("t1")) + it.foreach((putRecord) => { + val put = new Put(putRecord._1) + putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3)) + bufferedMutator.mutate(put) + }) + bufferedMutator.flush() + bufferedMutator.close() + }) + + val connection = ConnectionFactory.createConnection(config) + val table = connection.getTable(TableName.valueOf("t1")) + + try { + val foo1 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("1foreach"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")))) + assert(foo1 == "foo1") + + val foo2 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("2foreach"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("b")))) + assert(foo2 == "foo2") + + val foo3 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("3foreach"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("c")))) + assert(foo3 == "foo3") + + val foo4 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("4foreach"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("d")))) + assert(foo4 == "foo") + + val foo5 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("5"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("e")))) + assert(foo5 == "bar") + } finally { + table.close() + connection.close() + } + } + + test("mapPartitions with Get from test HBase client") { + val config = TEST_UTIL.getConfiguration + val connection = ConnectionFactory.createConnection(config) + val table = connection.getTable(TableName.valueOf("t1")) + + try { + var put = new Put(Bytes.toBytes("get1")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")) + table.put(put) + put = new Put(Bytes.toBytes("get2")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2")) + table.put(put) + put = new Put(Bytes.toBytes("get3")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3")) + table.put(put) + } finally { + table.close() + connection.close() + } + + val rdd = sc.parallelize(Array( + Bytes.toBytes("get1"), + Bytes.toBytes("get2"), + Bytes.toBytes("get3"), + Bytes.toBytes("get4"))) + val hbaseContext = new HBaseContext(sc, config) + + //Get with custom convert logic + val getRdd = rdd.hbaseMapPartitions(hbaseContext, (it, conn) => { + val table = conn.getTable(TableName.valueOf("t1")) + var res = mutable.MutableList[String]() + + it.foreach(r => { + val get = new Get(r) + val result = table.get(get) + if (result.listCells != null) { + val it = result.listCells().iterator() + val B = new StringBuilder + + B.append(Bytes.toString(result.getRow) + ":") + + while (it.hasNext) { + val cell = it.next() + val q = Bytes.toString(CellUtil.cloneQualifier(cell)) + if (q.equals("counter")) { + B.append("(" + q + "," + Bytes.toLong(CellUtil.cloneValue(cell)) + ")") + } else { + B.append("(" + q + "," + Bytes.toString(CellUtil.cloneValue(cell)) + ")") + } + } + res += "" + B.toString + } else { + res += "" + } + }) + res.iterator + }) + + val getArray = getRdd.collect() + + assert(getArray.length == 4) + assert(getArray.contains("get1:(a,foo1)")) + assert(getArray.contains("get2:(a,foo2)")) + assert(getArray.contains("get3:(a,foo3)")) + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 6e9b229..bc883a3 100644 --- a/pom.xml +++ b/pom.xml @@ -66,6 +66,7 @@ <module>hbase-rest</module> <module>hbase-checkstyle</module> <module>hbase-shaded</module> + <module>hbase-spark</module> </modules> <!--Add apache snapshots in case we want to use unreleased versions of plugins: e.g. surefire 2.18-SNAPSHOT-->
