Hi --
I've created a builder-style HBase client DSL for Groovy -- currently
it just wraps the client API to make inserts and row scans a little
easier. There probably plenty of room for improvement so I wanted to
submit it to the community. Any feedback or suggestions are welcome.
Here's an example:
def hbase = HBase.connect() // may optionally pass host name
/* Create: this will create a table if it does not exist, or disable
& update column families
if the table already does exist. The table will be enabled when
the create statement returns */
hbase.create( 'myTable' ) {
family( 'familyOne' ) {
inMemory = true
bloomFilter = false
}
}
/* Insert/ update rows:
hbase.update( 'myTable' ) {
row( 'rowOne' ) {
family( 'familyOne' ) {
col 'one', 'someValue'
col 'two', 'anotherValue'
col 'three', 1234
}
// alternate form that doesn't use nested family name:
col 'familyOne:four', 12345
}
row( 'rowTwo' ) { /* more column values */ }
// etc
}
So a more realistic example -- if you were iterating through some data
and inserting it would look like this:
hbase.update( 'myTable' ) {
new URL( someCSV ).eachLine { line ->
def values = line.split(',')
row( values[0] ) {
col 'fam1:val1', values[1]
// etc...
}
}
}
There is also wrapper for the scanner API as well:
hbase.scan( cols : ['fam:col1', 'fam:col2'],
start : '001', end : '200',
// any timestamp args may be long, Date or Calendar
timestamp : Date.parse( 'yy/mm/dd HH:MM:ss', '08/11/25 05:00:00' )
) { row ->
// each row is a RowResult instance -- which is a Map! So all map
operations are valid here:
row.each { println '${it.key} : ${it.value}' }
}
/*
* Copyright 2003-2007 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.enernoc.rnd.eps.groovy
import groovy.lang.Closure;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Scanner;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Builder-style HBase client DSL for Groovy
* @author Tom Nichols ([EMAIL PROTECTED])
*/
public class HBase {
protected Logger log = LoggerFactory.getLogger(getClass());
public static final String SCAN_BREAK = "com.enernoc.rnd.eps.SCANNER_BREAK";
HBaseConfiguration conf;
HBaseAdmin admin;
/** Default table to use for update/ scan operations */
HTable table;
protected HBase() {}
public static HBase connect() {
HBase hb = new HBase();
hb.setConf( new HBaseConfiguration() );
return hb;
}
public static HBase connect( String host ) {
HBaseConfiguration conf = new HBaseConfiguration();
conf.set( HConstants.MASTER_ADDRESS, host );
HBase hb = new HBase();
hb.setConf( conf );
return hb;
}
public static HBase connect( Map<String, Object> args ) throws IOException {
String host = (String)args.get("host");
HBase hb = host != null ? HBase.connect( host ) : HBase.connect();
hb.setTableName( (String)args.get("table") );
return hb;
}
public HTableDescriptor create( String tableName, Closure tableConfig ) throws MasterNotRunningException, IOException {
HTableDescriptor table = null;
if ( getAdmin().tableExists( tableName ) ) {
for ( HTableDescriptor td : admin.listTables() )
if ( td.getNameAsString().equals(tableName) ) {
table = td; break;
}
log.info( "Taking table {} offline for modification", tableName );
/* table needs to be disabled before calling the create delegate
where family configuration is done on a live table descriptor */
if ( admin.isTableEnabled(tableName) ) admin.disableTable( tableName );
}
else {
log.info( "Creating table '{}'...", tableName );
table = new HTableDescriptor( tableName );
}
CreateDelegate cd = new CreateDelegate(admin,table);
tableConfig.setDelegate( cd );
tableConfig.setResolveStrategy( Closure.DELEGATE_FIRST );
tableConfig.call( table );
if ( admin.tableExists( tableName ) ) {
admin.modifyTableMeta( tableName.getBytes(), table );
log.info( "Updated table: {}", table );
}
else {
admin.createTable( table );
log.info( "Created table: {}", table );
}
if ( ! admin.isTableEnabled( tableName ) ) admin.enableTable( tableName );
log.debug( "Enabled table: {}", tableName );
return table;
}
public HTable update( Closure updateClosure ) throws IOException {
return this.update( this.table, updateClosure );
}
public HTable update( String tableName, Closure updateClosure ) throws IOException {
return this.update( new HTable(conf, tableName), updateClosure );
}
public HTable update( HTable table, Closure updateClosure ) throws IOException {
UpdateDelegate delegate = new UpdateDelegate();
try {
updateClosure.setDelegate( delegate );
updateClosure.setResolveStrategy( Closure.DELEGATE_FIRST );
updateClosure.call( table );
for ( BatchUpdate update : delegate.getUpdates() ) {
table.commit( update );
log.debug( "Committed update to table '{}' : {}", new String(table.getTableName()), update );
}
}
finally { delegate = null; }
return table;
}
/**
* Short form of [EMAIL PROTECTED] #scan(Map,HTable,Closure)}. This method uses
* the default table defined in [EMAIL PROTECTED] #setTableName(String)}.
*/
public HTable scan( Map<String,Object> args, Closure scanClosure ) throws IOException {
return this.scan( args, this.table, scanClosure );
}
/**
* Alternate form of [EMAIL PROTECTED] #scan(Map,HTable,Closure)} which creates an
* HTable instance from the <code>tableName</code> string argument. Note
* that for readability, the table name may be given as the first parameter,
* i.e. <code>htable.scan( 'myTable', cols: ['col:1'] ) { /* result closure * / }</code>
*/
public HTable scan( Map<String,Object> args, String tableName, Closure scanClosure ) throws IOException {
return this.scan( args, new HTable( conf, tableName ), scanClosure );
}
/**
* <p>Create a scanner on the given table passing each [EMAIL PROTECTED] RowResult} to
* the <code>scanClosure</code>. The Scanner is guaranteed to be closed
* when the scanner iteration completes (either normally or due to an
* exception).</p>
*
* <p>Valid named arguments are:
* <dl>
* <dt>cols</dt><dd>(Required) List of columns to return. May be a simple regex pattern</dd>
* <dt>start</dt><dd>Starting row</dd>
* <dt>end</dt><dd>Ending row (not passed to the closure)</dd>
* <dt>timestamp</dt><dd>Either a <code>Date</code> or <code>long</code></dd>
* </dl>
* </p>
* <p>Note that for readability, the table name may be given as the first
* parameter, i.e.
* <code>htable.scan( 'myTable', cols: ['col:1'] ) { print it['col:1'] }</code>
*/
public HTable scan( Map<String,Object> args, HTable table, Closure scanClosure ) throws IOException {
List<String> cols = (List<String>)args.get("cols");
if ( cols == null ) throw new IllegalArgumentException("'cols' named parameter must be specified");
long timestamp = getTimestamp( args.get("timestamp") );
String startRow = (String)args.get("start");
if ( startRow == null ) startRow = new String(HConstants.EMPTY_START_ROW);
String endRow = (String)args.get("end"); // endRow may be null; this is OK
String[] colArray = new String[cols.size()];
Scanner scanner = table.getScanner( cols.toArray(colArray), startRow, endRow, timestamp );
RowResult row = null;
Object result = null;
int rowCount = 0;
long ts = System.currentTimeMillis();
try {
while ( result != SCAN_BREAK ) {
row = scanner.next();
if ( row == null ) break;
rowCount ++;
result = scanClosure.call( row );
}
}
finally {
scanner.close();
ts = System.currentTimeMillis() - ts;
if ( log.isDebugEnabled() ) {
log.debug( "Scanned {} rows on table '{}' in {}ms",
new Object[] {rowCount, new String(table.getTableName()), ts} );
}
}
return table;
}
public static byte[] getBytes( Object val ) {
if ( val == null ) return null;
if ( val.getClass() == String.class ) return Bytes.toBytes((String)val);
if ( val.getClass() == Integer.class ) return Bytes.toBytes((Integer)val);
if ( val.getClass() == Long.class ) return Bytes.toBytes((Long)val);
//TODO Double
if ( val instanceof Date ) return Bytes.toBytes(((Date)val).getTime());
if ( val instanceof Calendar ) return Bytes.toBytes(((Calendar)val).getTime().getTime());
throw new IllegalArgumentException("Value must be either a String, Number, Date or Calendar");
}
public static long getTimestamp( Object ts ) {
if ( ts == null ) return HConstants.LATEST_TIMESTAMP;
if ( ts.getClass() == Long.class || ts.getClass() == Integer.class ) return (Long)ts;
if ( ts instanceof Date ) return ((Date)ts).getTime();
if ( ts instanceof Calendar ) return ((Calendar)ts).getTime().getTime();
throw new IllegalArgumentException("Timestamp must be either a long, Date or Calendar");
}
public void setConf( HBaseConfiguration c ) {
this.conf = c;
}
public HBaseConfiguration getConf() {
return this.conf;
}
public HBaseAdmin getAdmin() throws MasterNotRunningException {
if ( this.admin == null ) this.admin = new HBaseAdmin( this.conf );
return this.admin;
}
/** Set the name of the 'default' table, i.e. when no table name is given
* in <code>update</code> and <code>scan</code> operations. */
public void setTableName( String name ) throws IOException {
if ( name == null || name.length() < 1 ) {
this.table = null;
return;
}
this.table = new HTable( name );
}
}
class CreateDelegate {
HBaseAdmin admin;
HTableDescriptor table;
public CreateDelegate(HBaseAdmin admin, HTableDescriptor table) {
this.admin = admin; this.table = table;
}
public HColumnDescriptor family( String familyName, Closure familyConfig ) throws IOException {
// column family names must end w/ a colon.
if ( ! familyName.endsWith(":") ) familyName += ':';
HColumnDescriptor colFamily = table.getFamily(familyName.getBytes());
if ( colFamily == null ) colFamily = new HColumnDescriptor(familyName);
// Allow closure arg to configure column family options:
//closureConfig.call( colFamily )
familyConfig.setDelegate( colFamily );
familyConfig.setResolveStrategy( Closure.DELEGATE_ONLY );
familyConfig.call();
// Add column to table.
if ( ! table.hasFamily( familyName.getBytes() ) ) table.addFamily( colFamily );
else admin.modifyColumn( table.getNameAsString(), familyName, colFamily );
return colFamily;
}
}
class UpdateDelegate {
// list of BatchUpdates created from calls to row('id') {....}
List<BatchUpdate> updates = new ArrayList<BatchUpdate>();
public List<BatchUpdate> getUpdates() { return this.updates; }
/**
* Current update instance, available for direct access within each 'row'
* call. This will be null outside of any row closure.
*/
BatchUpdate currentUpdate;
String currentFamily = null; // used for family { ... } closure
/**
* User-set timestamp that should be set at the beginning of the
* update { ... } closure. If this is not explicitly set it will
* default to HConstants.LATEST_TIMESTAMP, which is what BatchUpdate
* defaults to.
*/
Object defaultTimestamp = HConstants.LATEST_TIMESTAMP;
/**
* Short form for row( rowName, timestamp, rowClosure ) which uses the
* default timestamp.
*/
protected void row( String rowName, Closure rowClosure ) {
row( rowName, this.defaultTimestamp, rowClosure );
}
/**
* Method called within the 'update' closure in order to set values
* for each row being inserted or updated. This method takes a timestamp
* parameter that is used for all values set in this row. The parameter may
* be of type long, Date or Calendar. The current BatchUpdate instance is
* also passed as a parameter to the rowClosure.
* @param rowName the row key
* @param timestamp Date, Calendar or long timestamp to be used for this row update
* @param rowClosure closure used to set column values via calls to 'family',
* 'col' or direct use of the 'currentUpdate' BatchUpdate property.
*/
protected void row( String rowName, Object timestamp, Closure rowClosure ) {
if ( this.currentUpdate != null ) throw new IllegalStateException("Cannot nest row calls");
this.currentUpdate = new BatchUpdate( rowName );
rowClosure.setDelegate( this );
rowClosure.setResolveStrategy( Closure.DELEGATE_FIRST );
rowClosure.call( currentUpdate );
currentUpdate.setTimestamp( HBase.getTimestamp( timestamp ) );
this.updates.add( this.currentUpdate );
this.currentUpdate = null; // 'currentUpdate' should not be available outside of row closure
}
/**
* Method closure used within the row closure to insert several columns in
* the same column family. All calls to <code>col( 'name', 'val' )</code>
* within this closure will have the enclosing family prepended to each
* column name.
*
* @param familyName family name to automatically use within the colClosure
* scope. The family name will automatically be appended with ':' if
* needed.
* @param colClosure all <code>col</code> calls within this closure will
* automatically be prepended with this family name.
*/
protected void family(String familyName, Closure colClosure ) {
if ( currentFamily != null ) throw new IllegalStateException("Cannot nest family calls");
if ( currentUpdate == null ) throw new IllegalStateException("Family must be called from within a row closure");
currentFamily = familyName.endsWith(":") ? familyName : (familyName + ':');
colClosure.setDelegate( this );
colClosure.setResolveStrategy( Closure.DELEGATE_FIRST );
colClosure.call(); //currentUpdate
currentFamily = null;
}
/**
* Method call to set an individual column value. This may be called
* directly from a row closure, or from a nested family closure. If it
* is called directly from the row closure, case a family-qualified column
* name must be given (i.e. <code>col( 'family:name', val )</code>. If
* this is called from a family closure, only the column name should be
* given, i.e. <code>col( 'name', val )</code>.
*/
protected void col( String columnName, Object val ) {
if ( currentUpdate == null ) throw new IllegalStateException(
"Col must be called from within a row or family closure" );
if ( currentFamily != null ) columnName = currentFamily + columnName;
currentUpdate.put( columnName, HBase.getBytes(val) ) ;
}
}