I have written coprocessor code with below code. After compiling this code,
I have created the jar file. And then added this jar as a coprocessor to
the table.
alter 'SOURCE_TBL' , METHOD => 'table_att' , 'COPROCESSOR'=>
'hdfs:///apps/stsuid/HBASESamples-0.0.1-SNAPSHOT.jar|com.hbase.coprocessor.PutRegionObserverExample|1001'
After this, when I am doing put operation on source table 'SOURCE_TBL'
mentioned as below, I am expecting a record in index table 'INDEX_TBL' but
there are no records in INDEX_TBL.
put 'SOURCE_TBL' , '123' , 'data:CUST_ID' , 'RFRT'
Please help me and let me know if my code of coprocessor is wrong or am I
executing it in a wrong way.
package com.hbase.coprocessor;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.logging.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
public class PutRegionObserverExample extends BaseRegionObserver {
public static final byte[] FIXED_ROW = Bytes.toBytes("@@@GETTIME@@@");
private final static Logger LOG =
Logger.getLogger(PutRegionObserverExample.class.getName());
private HTablePool pool = null;
private final static String INDEX_TABLE = "INDEX_TBL";
private final static String SOURCE_TABLE = "SOURCE_TBL";
public void start(CoprocessorEnvironment env) throws IOException {
LOG.info("(start)");
pool = new HTablePool(env.getConfiguration(), 10);
}
//@Override
public void postPut(
final ObserverContext<RegionCoprocessorEnvironment> observerContext,
final Put put,
final WALEdit edit,
final boolean writeToWAL)
throws IOException {
byte[] table =
observerContext.getEnvironment().getRegion().getRegionInfo().getTableName();
if (!Bytes.equals(table, Bytes.toBytes(SOURCE_TABLE))) {
LOG.info("SOURCE_TABLE");
return;
}
try {
final List<Cell> filteredList = put.get(Bytes.toBytes
("data"),Bytes.toBytes("CUST_ID"));
byte [] id = put.getRow(); //Get the Entry ID
Cell kv=filteredList.get( 0 ); //get Entry PARENT_ID
byte[] parentId = kv.getValue();
HTableInterface htbl = pool.getTable(Bytes.toBytes(INDEX_TABLE));
//create row key for the index table
byte[] p1 = concatTwoByteArrays(parentId, ":".getBytes());
//Insert a semicolon between two UUIDs
byte[] rowkey = concatTwoByteArrays(p1, id);
Put indexput = new Put(rowkey);
indexput.add(Bytes.toBytes("data"), Bytes.toBytes("CUST_ID"),
Bytes.toBytes("tgt9"));
htbl.put(indexput);
htbl.close();
} catch ( IllegalArgumentException ex) {
// handle excepion.
LOG.info(ex.getMessage());
}
}
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
pool.close();
}
public static final byte[] concatTwoByteArrays(byte[] first, byte[]
second) {
byte[] result = Arrays.copyOf(first, first.length + second.length);
System.arraycopy(second, 0, result, first.length, second.length);
return result;
}
}