Hi --

I've created a builder-style HBase client DSL for Groovy -- currently
it just wraps the client API to make inserts and row scans a little
easier.  There probably plenty of room for improvement so I wanted to
submit it to the community.  Any feedback or suggestions are welcome.

Here's an example:

def hbase = HBase.connect() // may optionally pass host name
/* Create:  this will create a table if it does not exist, or disable
& update column families
   if the table already does exist.  The table will be enabled when
the create statement returns */
hbase.create( 'myTable' ) {
  family( 'familyOne' ) {
    inMemory = true
    bloomFilter = false
  }
}

/* Insert/ update rows:
hbase.update( 'myTable' ) {
  row( 'rowOne' ) {
    family( 'familyOne' ) {
      col 'one', 'someValue'
      col 'two', 'anotherValue'
      col 'three', 1234
    }
    // alternate form that doesn't use nested family name:
    col 'familyOne:four', 12345
  }
  row( 'rowTwo' ) { /* more column values */ }
  // etc
}

So a more realistic example -- if you were iterating through some data
and inserting it would look like this:

hbase.update( 'myTable' ) {
        new URL( someCSV ).eachLine { line ->
                def values = line.split(',')    
                row( values[0] ) {
                        col 'fam1:val1', values[1]
                        // etc...       
                }
        }
}

There is also wrapper for the scanner API as well:

hbase.scan( cols : ['fam:col1', 'fam:col2'],
            start : '001', end : '200',
            // any timestamp args may be long, Date or Calendar
            timestamp : Date.parse( 'yy/mm/dd HH:MM:ss', '08/11/25 05:00:00' )
            ) { row ->

        // each row is a RowResult instance -- which is a Map!  So all map
operations are valid here:
  row.each { println '${it.key} : ${it.value}' }
}
/*
 * Copyright 2003-2007 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.enernoc.rnd.eps.groovy

import groovy.lang.Closure;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Scanner;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * Builder-style HBase client DSL for Groovy
 * @author Tom Nichols ([EMAIL PROTECTED])
 */
public class HBase {

	protected Logger log = LoggerFactory.getLogger(getClass());
	public static final String SCAN_BREAK = "com.enernoc.rnd.eps.SCANNER_BREAK";

	HBaseConfiguration conf;	
	HBaseAdmin admin;
	
	/** Default table to use for update/ scan operations */
	HTable table;
	
	protected HBase() {}
	
	public static HBase connect() {
		HBase hb = new HBase();
		hb.setConf( new HBaseConfiguration() );
		return hb;
	}
	
	public static HBase connect( String host ) {
		HBaseConfiguration conf = new HBaseConfiguration();
		conf.set( HConstants.MASTER_ADDRESS, host );
		HBase hb = new HBase();
		hb.setConf( conf );
		return hb;
	}
	
	public static HBase connect( Map<String, Object> args ) throws IOException {
		String host = (String)args.get("host"); 
		HBase hb = host != null ? HBase.connect( host ) : HBase.connect();
		hb.setTableName( (String)args.get("table") );
		return hb;
	}
	
	public HTableDescriptor create( String tableName, Closure tableConfig ) throws MasterNotRunningException, IOException {
		HTableDescriptor table = null;
		
		if ( getAdmin().tableExists( tableName ) ) {
			for ( HTableDescriptor td : admin.listTables() )
				if ( td.getNameAsString().equals(tableName) ) {
					table = td; break;
				}
			log.info( "Taking table {} offline for modification", tableName );
			
			/* table needs to be disabled before calling the create delegate 
			   where family configuration is done on a live table descriptor */
			if ( admin.isTableEnabled(tableName) ) admin.disableTable( tableName );
		}
		else {
			log.info( "Creating table '{}'...", tableName );
			table = new HTableDescriptor( tableName );
		}
				
		CreateDelegate cd = new CreateDelegate(admin,table);
		tableConfig.setDelegate( cd );
		tableConfig.setResolveStrategy( Closure.DELEGATE_FIRST );
		tableConfig.call( table );
		
		if ( admin.tableExists( tableName ) ) {
			admin.modifyTableMeta( tableName.getBytes(), table );
			log.info( "Updated table: {}", table );
		}
		else {
			admin.createTable( table );
			log.info( "Created table: {}", table );
		}
		if ( ! admin.isTableEnabled( tableName ) ) admin.enableTable( tableName );
		log.debug( "Enabled table: {}", tableName );
		return table;
	}
	
	public HTable update( Closure updateClosure ) throws IOException {
		return this.update( this.table, updateClosure );
	}
	
	public HTable update( String tableName, Closure updateClosure ) throws IOException {
		return this.update( new HTable(conf, tableName), updateClosure );
	}
	
	public HTable update( HTable table, Closure updateClosure ) throws IOException {
		UpdateDelegate delegate = new UpdateDelegate();
		try {
			updateClosure.setDelegate( delegate );
			updateClosure.setResolveStrategy( Closure.DELEGATE_FIRST );
			updateClosure.call( table );
			
			for ( BatchUpdate update : delegate.getUpdates() ) {
				table.commit( update );
				log.debug( "Committed update to table '{}' : {}", new String(table.getTableName()), update );
			}
		}
		finally { delegate = null; }
		return table;
	}
	
	/**
	 * Short form of [EMAIL PROTECTED] #scan(Map,HTable,Closure)}.  This method uses
	 * the default table defined in [EMAIL PROTECTED] #setTableName(String)}.  
	 */
	public HTable scan( Map<String,Object> args, Closure scanClosure ) throws IOException {
		return this.scan( args, this.table, scanClosure );
	}
	
	/**
	 * Alternate form of [EMAIL PROTECTED] #scan(Map,HTable,Closure)} which creates an 
	 * HTable instance from the <code>tableName</code> string argument.  Note
	 * that for readability, the table name may be given as the first parameter,
	 * i.e. <code>htable.scan( 'myTable', cols: ['col:1'] ) { /* result closure * / }</code>
	 */
	public HTable scan( Map<String,Object> args, String tableName, Closure scanClosure ) throws IOException {
		return this.scan( args, new HTable( conf, tableName ), scanClosure );
	}
	
	/**
	 * <p>Create a scanner on the given table passing each [EMAIL PROTECTED] RowResult} to 
	 * the <code>scanClosure</code>.  The Scanner is guaranteed to be closed 
	 * when the scanner iteration completes (either normally or due to an 
	 * exception).</p>  
	 * 
	 * <p>Valid named arguments are:
	 * <dl>
	 *   <dt>cols</dt><dd>(Required) List of columns to return.  May be a simple regex pattern</dd>
	 *   <dt>start</dt><dd>Starting row</dd>
	 *   <dt>end</dt><dd>Ending row (not passed to the closure)</dd>
	 *   <dt>timestamp</dt><dd>Either a <code>Date</code> or <code>long</code></dd>
	 * </dl>
	 * </p>
	 * <p>Note that for readability, the table name may be given as the first 
	 * parameter, i.e. 
	 * <code>htable.scan( 'myTable', cols: ['col:1'] ) { print it['col:1'] }</code>
	 */
	public HTable scan( Map<String,Object> args, HTable table, Closure scanClosure ) throws IOException {
		List<String> cols = (List<String>)args.get("cols");
		if ( cols == null ) throw new IllegalArgumentException("'cols' named parameter must be specified");

		long timestamp = getTimestamp( args.get("timestamp") );
		
		String startRow = (String)args.get("start");
		if ( startRow == null ) startRow = new String(HConstants.EMPTY_START_ROW);
		String endRow = (String)args.get("end"); // endRow may be null; this is OK
		String[] colArray = new String[cols.size()];
		Scanner scanner = table.getScanner( cols.toArray(colArray), startRow, endRow, timestamp );
		
		RowResult row = null;
		Object result = null;
		int rowCount = 0;
		long ts = System.currentTimeMillis();
		try {
			while ( result != SCAN_BREAK ) {
				row = scanner.next();
				if ( row == null ) break;
				rowCount ++;
				result = scanClosure.call( row );
			}
		}
		finally { 
			scanner.close();
			ts = System.currentTimeMillis() - ts;
			if ( log.isDebugEnabled() ) {
				log.debug( "Scanned {} rows on table '{}' in {}ms", 
					new Object[] {rowCount, new String(table.getTableName()), ts} );
			}
		}
		return table;
	}
	
	public static byte[] getBytes( Object val ) {
		if ( val == null ) return null;
		if ( val.getClass() == String.class ) return Bytes.toBytes((String)val);
		if ( val.getClass() == Integer.class ) return Bytes.toBytes((Integer)val);
		if ( val.getClass() == Long.class ) return Bytes.toBytes((Long)val);
		//TODO Double
		
		if ( val instanceof Date ) return Bytes.toBytes(((Date)val).getTime());
		if ( val instanceof Calendar ) return Bytes.toBytes(((Calendar)val).getTime().getTime());
		
		throw new IllegalArgumentException("Value must be either a String, Number, Date or Calendar");
	}
	
	public static long getTimestamp( Object ts ) {
		if ( ts == null ) return HConstants.LATEST_TIMESTAMP;
		if ( ts.getClass() == Long.class || ts.getClass() == Integer.class ) return (Long)ts;
		
		if ( ts instanceof Date ) return ((Date)ts).getTime();
		if ( ts instanceof Calendar ) return ((Calendar)ts).getTime().getTime();

		throw new IllegalArgumentException("Timestamp must be either a long, Date or Calendar");
	}
	
	public void setConf( HBaseConfiguration c ) {
		this.conf = c;
	}
	
	public HBaseConfiguration getConf() {
		return this.conf;
	}
	
	public HBaseAdmin getAdmin() throws MasterNotRunningException {
		if ( this.admin == null ) this.admin = new HBaseAdmin( this.conf );
		return this.admin;
	}
	
	/** Set the name of the 'default' table, i.e. when no table name is given 
	 * in <code>update</code> and <code>scan</code> operations. */
	public void setTableName( String name ) throws IOException {
		if ( name == null || name.length() < 1 ) {
			this.table = null;
			return;
		}
		this.table = new HTable( name );
	}
}

class CreateDelegate {
	HBaseAdmin admin;
	HTableDescriptor table;
	public CreateDelegate(HBaseAdmin admin, HTableDescriptor table) {
		this.admin = admin; this.table = table;
	}

	public HColumnDescriptor family( String familyName, Closure familyConfig ) throws IOException {
		// column family names must end w/ a colon.
		if ( ! familyName.endsWith(":") ) familyName += ':';
	
		HColumnDescriptor colFamily = table.getFamily(familyName.getBytes());
		if ( colFamily == null ) colFamily = new HColumnDescriptor(familyName);

		// Allow closure arg to configure column family options:
		//closureConfig.call( colFamily )
		familyConfig.setDelegate( colFamily );
		familyConfig.setResolveStrategy( Closure.DELEGATE_ONLY );
		familyConfig.call();

		// Add column to table.
		if ( ! table.hasFamily( familyName.getBytes() ) ) table.addFamily( colFamily );
		else admin.modifyColumn( table.getNameAsString(), familyName, colFamily );
		return colFamily;
	}
}

class UpdateDelegate {
	 // list of BatchUpdates created from calls to row('id') {....}
	List<BatchUpdate> updates = new ArrayList<BatchUpdate>();
	public List<BatchUpdate> getUpdates() { return this.updates; }
	/**
	 * Current update instance, available for direct access within each 'row'
	 * call.  This will be null outside of any row closure.
	 */
	BatchUpdate currentUpdate;
	String currentFamily = null; // used for family { ... } closure
	/**
	 * User-set timestamp that should be set at the beginning of the 
	 * update { ... } closure.  If this is not explicitly set it will
	 * default to HConstants.LATEST_TIMESTAMP, which is what BatchUpdate 
	 * defaults to. 
	 */
	Object defaultTimestamp = HConstants.LATEST_TIMESTAMP;
	
	/**
	 * Short form for row( rowName, timestamp, rowClosure ) which uses the 
	 * default timestamp.
	 */
	protected void row( String rowName, Closure rowClosure ) {
		row( rowName, this.defaultTimestamp, rowClosure );
	}
	
	/**
	 * Method called within the 'update' closure in order to set values
	 * for each row being inserted or updated.  This method takes a timestamp 
	 * parameter that is used for all values set in this row.  The parameter may 
	 * be of type long, Date or Calendar.  The current BatchUpdate instance is
	 * also passed as a parameter to the rowClosure.
	 * @param rowName the row key
	 * @param timestamp Date, Calendar or long timestamp to be used for this row update
	 * @param rowClosure closure used to set column values via calls to 'family',
	 *   'col' or direct use of the 'currentUpdate' BatchUpdate property.
	 */
	protected void row( String rowName, Object timestamp, Closure rowClosure ) {
		if ( this.currentUpdate != null ) throw new IllegalStateException("Cannot nest row calls");
	
		this.currentUpdate = new BatchUpdate( rowName );
		
		rowClosure.setDelegate( this );
		rowClosure.setResolveStrategy( Closure.DELEGATE_FIRST );
		rowClosure.call( currentUpdate );
		
		currentUpdate.setTimestamp( HBase.getTimestamp( timestamp ) );
		this.updates.add( this.currentUpdate );
		this.currentUpdate = null; // 'currentUpdate' should not be available outside of row closure
	}
	
	/**
	 * Method closure used within the row closure to insert several columns in
	 * the same column family. All calls to <code>col( 'name', 'val' )</code>
	 * within this closure will have the enclosing family prepended to each 
	 * column name.  
	 * 
	 * @param familyName family name to automatically use within the colClosure 
	 *   scope.  The family name will automatically be appended with ':' if
	 *   needed.
	 * @param colClosure all <code>col</code> calls within this closure will 
	 *   automatically be prepended with this family name.
	 */
	protected void family(String familyName, Closure colClosure ) {
		if ( currentFamily != null ) throw new IllegalStateException("Cannot nest family calls");
		if ( currentUpdate == null ) throw new IllegalStateException("Family must be called from within a row closure");
	
		currentFamily = familyName.endsWith(":") ? familyName : (familyName + ':');
		colClosure.setDelegate( this );
		colClosure.setResolveStrategy( Closure.DELEGATE_FIRST );
		colClosure.call(); //currentUpdate
		currentFamily = null;
	}
	
	/**
	 * Method call to set an individual column value.  This may be called 
	 * directly from a row closure, or from a nested family closure.  If it 
	 * is called directly from the row closure, case a family-qualified column 
	 * name must be given (i.e. <code>col( 'family:name', val )</code>.  If
	 * this is called from a family closure, only the column name should be 
	 * given, i.e. <code>col( 'name', val )</code>. 
	 */
	protected void col( String columnName, Object val ) {
		if ( currentUpdate == null ) throw new IllegalStateException( 
				"Col must be called from within a row or family closure" );
		if ( currentFamily != null ) columnName = currentFamily + columnName;
		currentUpdate.put( columnName, HBase.getBytes(val) ) ;
	}
}

Reply via email to