Author: tedyu
Date: Fri Feb 3 19:02:13 2012
New Revision: 1240294
URL: http://svn.apache.org/viewvc?rev=1240294&view=rev
Log:
HBASE-4658 Put attributes are not exposed via the ThriftServer (Dhruba)
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/generated/Hbase.java
hbase/trunk/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift
hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java?rev=1240294&r1=1240293&r2=1240294&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
Fri Feb 3 19:02:13 2012
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.regionse
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -113,8 +114,8 @@ public class HRegionThriftServer extends
public List<TRowResult> getRowWithColumnsTs(ByteBuffer tableName,
ByteBuffer rowb,
List<ByteBuffer> columns,
- long timestamp)
- throws IOError {
+ long timestamp,
+ Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
try {
byte [] row = rowb.array();
HTable table = getTable(tableName.array());
@@ -146,7 +147,8 @@ public class HRegionThriftServer extends
throw new IOError(e.getMessage());
}
LOG.debug("ThriftServer redirecting getRowWithColumnsTs");
- return super.getRowWithColumnsTs(tableName, rowb, columns, timestamp);
+ return super.getRowWithColumnsTs(tableName, rowb, columns, timestamp,
+ attributes);
} catch (IOException e) {
throw new IOError(e.getMessage());
}
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java?rev=1240294&r1=1240293&r2=1240294&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
Fri Feb 3 19:02:13 2012
@@ -31,6 +31,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
@@ -53,6 +54,7 @@ import org.apache.hadoop.hbase.client.De
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.OperationWithAttributes;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -581,22 +583,25 @@ public class ThriftServerRunner implemen
@Deprecated
@Override
public List<TCell> get(
- ByteBuffer tableName, ByteBuffer row, ByteBuffer column)
+ ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
+ Map<ByteBuffer, ByteBuffer> attributes)
throws IOError {
byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
if(famAndQf.length == 1) {
- return get(tableName, row, famAndQf[0], new byte[0]);
+ return get(tableName, row, famAndQf[0], new byte[0], attributes);
}
- return get(tableName, row, famAndQf[0], famAndQf[1]);
+ return get(tableName, row, famAndQf[0], famAndQf[1], attributes);
}
protected List<TCell> get(ByteBuffer tableName,
ByteBuffer row,
byte[] family,
- byte[] qualifier) throws IOError {
+ byte[] qualifier,
+ Map<ByteBuffer, ByteBuffer> attributes) throws
IOError {
try {
HTable table = getTable(tableName);
Get get = new Get(getBytes(row));
+ addAttributes(get, attributes);
if (qualifier == null || qualifier.length == 0) {
get.addFamily(family);
} else {
@@ -612,22 +617,25 @@ public class ThriftServerRunner implemen
@Deprecated
@Override
public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row,
- ByteBuffer column, int numVersions) throws IOError {
+ ByteBuffer column, int numVersions,
+ Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
if(famAndQf.length == 1) {
return getVer(tableName, row, famAndQf[0],
- new byte[0], numVersions);
+ new byte[0], numVersions, attributes);
}
return getVer(tableName, row,
- famAndQf[0], famAndQf[1], numVersions);
+ famAndQf[0], famAndQf[1], numVersions, attributes);
}
public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row,
byte[] family,
- byte[] qualifier, int numVersions) throws IOError {
+ byte[] qualifier, int numVersions,
+ Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
try {
HTable table = getTable(tableName);
Get get = new Get(getBytes(row));
+ addAttributes(get, attributes);
get.addColumn(family, qualifier);
get.setMaxVersions(numVersions);
Result result = table.get(get);
@@ -643,22 +651,25 @@ public class ThriftServerRunner implemen
ByteBuffer row,
ByteBuffer column,
long timestamp,
- int numVersions) throws IOError {
+ int numVersions,
+ Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
if(famAndQf.length == 1) {
return getVerTs(tableName, row, famAndQf[0], new byte[0], timestamp,
- numVersions);
+ numVersions, attributes);
}
return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp,
- numVersions);
+ numVersions, attributes);
}
protected List<TCell> getVerTs(ByteBuffer tableName,
ByteBuffer row, byte [] family,
- byte [] qualifier, long timestamp, int numVersions) throws IOError {
+ byte [] qualifier, long timestamp, int numVersions,
+ Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
try {
HTable table = getTable(tableName);
Get get = new Get(getBytes(row));
+ addAttributes(get, attributes);
get.addColumn(family, qualifier);
get.setTimeRange(Long.MIN_VALUE, timestamp);
get.setMaxVersions(numVersions);
@@ -670,40 +681,45 @@ public class ThriftServerRunner implemen
}
@Override
- public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row)
- throws IOError {
+ public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row,
+ Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
return getRowWithColumnsTs(tableName, row, null,
- HConstants.LATEST_TIMESTAMP);
+ HConstants.LATEST_TIMESTAMP,
+ attributes);
}
@Override
public List<TRowResult> getRowWithColumns(ByteBuffer tableName,
ByteBuffer row,
- List<ByteBuffer> columns) throws IOError {
+ List<ByteBuffer> columns,
+ Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
return getRowWithColumnsTs(tableName, row, columns,
- HConstants.LATEST_TIMESTAMP);
+ HConstants.LATEST_TIMESTAMP,
+ attributes);
}
@Override
public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row,
- long timestamp) throws IOError {
+ long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError
{
return getRowWithColumnsTs(tableName, row, null,
- timestamp);
+ timestamp, attributes);
}
@Override
public List<TRowResult> getRowWithColumnsTs(
ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
- long timestamp) throws IOError {
+ long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError
{
try {
HTable table = getTable(tableName);
if (columns == null) {
Get get = new Get(getBytes(row));
+ addAttributes(get, attributes);
get.setTimeRange(Long.MIN_VALUE, timestamp);
Result result = table.get(get);
return ThriftUtilities.rowResultFromHBase(result);
}
Get get = new Get(getBytes(row));
+ addAttributes(get, attributes);
for(ByteBuffer column : columns) {
byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
if (famAndQf.length == 1) {
@@ -722,37 +738,44 @@ public class ThriftServerRunner implemen
@Override
public List<TRowResult> getRows(ByteBuffer tableName,
- List<ByteBuffer> rows)
+ List<ByteBuffer> rows,
+ Map<ByteBuffer, ByteBuffer> attributes)
throws IOError {
return getRowsWithColumnsTs(tableName, rows, null,
- HConstants.LATEST_TIMESTAMP);
+ HConstants.LATEST_TIMESTAMP,
+ attributes);
}
@Override
public List<TRowResult> getRowsWithColumns(ByteBuffer tableName,
List<ByteBuffer> rows,
- List<ByteBuffer> columns) throws IOError {
+ List<ByteBuffer> columns,
+ Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
return getRowsWithColumnsTs(tableName, rows, columns,
- HConstants.LATEST_TIMESTAMP);
+ HConstants.LATEST_TIMESTAMP,
+ attributes);
}
@Override
public List<TRowResult> getRowsTs(ByteBuffer tableName,
List<ByteBuffer> rows,
- long timestamp) throws IOError {
+ long timestamp,
+ Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
return getRowsWithColumnsTs(tableName, rows, null,
- timestamp);
+ timestamp, attributes);
}
@Override
public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName,
List<ByteBuffer> rows,
- List<ByteBuffer> columns, long timestamp) throws IOError {
+ List<ByteBuffer> columns, long timestamp,
+ Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
try {
List<Get> gets = new ArrayList<Get>(rows.size());
HTable table = getTable(tableName);
for (ByteBuffer row : rows) {
Get get = new Get(getBytes(row));
+ addAttributes(get, attributes);
if (columns != null) {
for(ByteBuffer column : columns) {
@@ -776,19 +799,22 @@ public class ThriftServerRunner implemen
@Override
public void deleteAll(
- ByteBuffer tableName, ByteBuffer row, ByteBuffer column)
+ ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
+ Map<ByteBuffer, ByteBuffer> attributes)
throws IOError {
- deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP);
+ deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP,
+ attributes);
}
@Override
public void deleteAllTs(ByteBuffer tableName,
ByteBuffer row,
ByteBuffer column,
- long timestamp) throws IOError {
+ long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError
{
try {
HTable table = getTable(tableName);
Delete delete = new Delete(getBytes(row));
+ addAttributes(delete, attributes);
byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
if (famAndQf.length == 1) {
delete.deleteFamily(famAndQf[0], timestamp);
@@ -804,16 +830,19 @@ public class ThriftServerRunner implemen
@Override
public void deleteAllRow(
- ByteBuffer tableName, ByteBuffer row) throws IOError {
- deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP);
+ ByteBuffer tableName, ByteBuffer row,
+ Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+ deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes);
}
@Override
public void deleteAllRowTs(
- ByteBuffer tableName, ByteBuffer row, long timestamp) throws IOError {
+ ByteBuffer tableName, ByteBuffer row, long timestamp,
+ Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
try {
HTable table = getTable(tableName);
Delete delete = new Delete(getBytes(row), timestamp, null);
+ addAttributes(delete, attributes);
table.delete(delete);
} catch (IOException e) {
throw new IOError(e.getMessage());
@@ -860,20 +889,25 @@ public class ThriftServerRunner implemen
@Override
public void mutateRow(ByteBuffer tableName, ByteBuffer row,
- List<Mutation> mutations) throws IOError, IllegalArgument {
- mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP);
+ List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes)
+ throws IOError, IllegalArgument {
+ mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP,
+ attributes);
}
@Override
public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
- List<Mutation> mutations, long timestamp)
+ List<Mutation> mutations, long timestamp,
+ Map<ByteBuffer, ByteBuffer> attributes)
throws IOError, IllegalArgument {
HTable table = null;
try {
table = getTable(tableName);
Put put = new Put(getBytes(row), timestamp, null);
+ addAttributes(put, attributes);
Delete delete = new Delete(getBytes(row));
+ addAttributes(delete, attributes);
// I apologize for all this mess :)
for (Mutation m : mutations) {
@@ -910,14 +944,16 @@ public class ThriftServerRunner implemen
}
@Override
- public void mutateRows(ByteBuffer tableName, List<BatchMutation>
rowBatches)
+ public void mutateRows(ByteBuffer tableName, List<BatchMutation>
rowBatches,
+ Map<ByteBuffer, ByteBuffer> attributes)
throws IOError, IllegalArgument, TException {
- mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP);
+ mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP,
attributes);
}
@Override
public void mutateRowsTs(
- ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp)
+ ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
+ Map<ByteBuffer, ByteBuffer> attributes)
throws IOError, IllegalArgument, TException {
List<Put> puts = new ArrayList<Put>();
List<Delete> deletes = new ArrayList<Delete>();
@@ -926,7 +962,9 @@ public class ThriftServerRunner implemen
byte[] row = getBytes(batch.row);
List<Mutation> mutations = batch.mutations;
Delete delete = new Delete(row);
+ addAttributes(delete, attributes);
Put put = new Put(row, timestamp, null);
+ addAttributes(put, attributes);
for (Mutation m : mutations) {
byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
if (m.isDelete) {
@@ -1033,11 +1071,13 @@ public class ThriftServerRunner implemen
return scannerGetList(id,1);
}
- public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan)
+ public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
+ Map<ByteBuffer, ByteBuffer> attributes)
throws IOError {
try {
HTable table = getTable(tableName);
Scan scan = new Scan();
+ addAttributes(scan, attributes);
if (tScan.isSetStartRow()) {
scan.setStartRow(tScan.getStartRow());
}
@@ -1073,10 +1113,12 @@ public class ThriftServerRunner implemen
@Override
public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
- List<ByteBuffer> columns) throws IOError {
+ List<ByteBuffer> columns,
+ Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
try {
HTable table = getTable(tableName);
Scan scan = new Scan(getBytes(startRow));
+ addAttributes(scan, attributes);
if(columns != null && columns.size() != 0) {
for(ByteBuffer column : columns) {
byte [][] famQf = KeyValue.parseColumn(getBytes(column));
@@ -1095,11 +1137,13 @@ public class ThriftServerRunner implemen
@Override
public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
- ByteBuffer stopRow, List<ByteBuffer> columns)
+ ByteBuffer stopRow, List<ByteBuffer> columns,
+ Map<ByteBuffer, ByteBuffer> attributes)
throws IOError, TException {
try {
HTable table = getTable(tableName);
Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
+ addAttributes(scan, attributes);
if(columns != null && columns.size() != 0) {
for(ByteBuffer column : columns) {
byte [][] famQf = KeyValue.parseColumn(getBytes(column));
@@ -1119,11 +1163,13 @@ public class ThriftServerRunner implemen
@Override
public int scannerOpenWithPrefix(ByteBuffer tableName,
ByteBuffer startAndPrefix,
- List<ByteBuffer> columns)
+ List<ByteBuffer> columns,
+ Map<ByteBuffer, ByteBuffer> attributes)
throws IOError, TException {
try {
HTable table = getTable(tableName);
Scan scan = new Scan(getBytes(startAndPrefix));
+ addAttributes(scan, attributes);
Filter f = new WhileMatchFilter(
new PrefixFilter(getBytes(startAndPrefix)));
scan.setFilter(f);
@@ -1145,10 +1191,12 @@ public class ThriftServerRunner implemen
@Override
public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
- List<ByteBuffer> columns, long timestamp) throws IOError, TException {
+ List<ByteBuffer> columns, long timestamp,
+ Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
try {
HTable table = getTable(tableName);
Scan scan = new Scan(getBytes(startRow));
+ addAttributes(scan, attributes);
scan.setTimeRange(Long.MIN_VALUE, timestamp);
if (columns != null && columns.size() != 0) {
for (ByteBuffer column : columns) {
@@ -1168,11 +1216,13 @@ public class ThriftServerRunner implemen
@Override
public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
- ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp)
+ ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
+ Map<ByteBuffer, ByteBuffer> attributes)
throws IOError, TException {
try {
HTable table = getTable(tableName);
Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
+ addAttributes(scan, attributes);
scan.setTimeRange(Long.MIN_VALUE, timestamp);
if (columns != null && columns.size() != 0) {
for (ByteBuffer column : columns) {
@@ -1266,4 +1316,18 @@ public class ThriftServerRunner implemen
}
}
}
+ /**
+ * Adds all the attributes into the Operation object
+ */
+ private static void addAttributes(OperationWithAttributes op,
+ Map<ByteBuffer, ByteBuffer> attributes) {
+ if (attributes == null || attributes.size() == 0) {
+ return;
+ }
+ for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) {
+ String name = Bytes.toStringBinary(entry.getKey());
+ byte[] value = Bytes.toBytes(entry.getValue());
+ op.setAttribute(name, value);
+ }
+ }
}