extends PhoenixTransactionTable to inherit from HtableInterface and implement the needed methods in TephraTransactionTable
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d2c16533 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d2c16533 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d2c16533 Branch: refs/heads/omid Commit: d2c1653309fa97974b1c8bce3352cfc54d180567 Parents: c451343 Author: Ohad Shacham <[email protected]> Authored: Tue Mar 7 11:57:42 2017 +0200 Committer: Ohad Shacham <[email protected]> Committed: Tue Mar 7 11:57:42 2017 +0200 ---------------------------------------------------------------------- .../transaction/OmidTransactionTable.java | 196 ++++++++++++++++++- .../transaction/PhoenixTransactionalTable.java | 2 +- .../transaction/TephraTransactionTable.java | 173 ++++++++++++++++ 3 files changed, 365 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/d2c16533/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java index 725fe16..d2cd020 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java @@ -2,16 +2,32 @@ package org.apache.phoenix.transaction; import java.io.IOException; import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Call; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; + +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; public class OmidTransactionTable implements PhoenixTransactionalTable { @@ -89,19 +105,16 @@ public class OmidTransactionTable implements PhoenixTransactionalTable { @Override public void put(List<Put> puts) throws IOException { // TODO Auto-generated method stub - } @Override public void delete(List<Delete> deletes) throws IOException { // TODO Auto-generated method stub - } @Override public void setAutoFlush(boolean autoFlush) { // TODO Auto-generated method stub - } @Override @@ -119,19 +132,192 @@ public class OmidTransactionTable implements PhoenixTransactionalTable { @Override public void setWriteBufferSize(long writeBufferSize) throws IOException { // TODO Auto-generated method stub - } @Override public void flushCommits() throws IOException { // TODO Auto-generated method stub - } @Override public void close() throws IOException { // TODO Auto-generated method stub + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, + byte[] qualifier, long amount, boolean writeToWAL) + throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public Boolean[] exists(List<Get> gets) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) { + // TODO Auto-generated method stub + } + + @Override + public void setAutoFlushTo(boolean autoFlush) { + // TODO Auto-generated method stub + } + + @Override + public Result getRowOrBefore(byte[] row, byte[] family) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public TableName getName() { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean[] existsAll(List<Get> gets) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public void batch(List<? extends Row> actions, Object[] results) + throws IOException, InterruptedException { + // TODO Auto-generated method stub + } + + @Override + public Object[] batch(List<? extends Row> actions) throws IOException, + InterruptedException { + // TODO Auto-generated method stub + return null; + } + + @Override + public <R> void batchCallback(List<? extends Row> actions, + Object[] results, Callback<R> callback) throws IOException, + InterruptedException { + // TODO Auto-generated method stub + } + + @Override + public <R> Object[] batchCallback(List<? extends Row> actions, + Callback<R> callback) throws IOException, InterruptedException { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, + byte[] value, Put put) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, Put put) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + byte[] value, Delete delete) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, Delete delete) + throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public void mutateRow(RowMutations rm) throws IOException { + // TODO Auto-generated method stub + } + + @Override + public Result append(Append append) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public Result increment(Increment increment) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, + byte[] qualifier, long amount) throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, + byte[] qualifier, long amount, Durability durability) + throws IOException { + // TODO Auto-generated method stub + return 0; + } + @Override + public CoprocessorRpcChannel coprocessorService(byte[] row) { + // TODO Auto-generated method stub + return null; + } + + @Override + public <T extends Service, R> Map<byte[], R> coprocessorService( + Class<T> service, byte[] startKey, byte[] endKey, + Call<T, R> callable) throws ServiceException, Throwable { + // TODO Auto-generated method stub + return null; + } + + @Override + public <T extends Service, R> void coprocessorService(Class<T> service, + byte[] startKey, byte[] endKey, Call<T, R> callable, + Callback<R> callback) throws ServiceException, Throwable { + // TODO Auto-generated method stub + } + + @Override + public <R extends Message> Map<byte[], R> batchCoprocessorService( + MethodDescriptor methodDescriptor, Message request, + byte[] startKey, byte[] endKey, R responsePrototype) + throws ServiceException, Throwable { + // TODO Auto-generated method stub + return null; + } + + @Override + public <R extends Message> void batchCoprocessorService( + MethodDescriptor methodDescriptor, Message request, + byte[] startKey, byte[] endKey, R responsePrototype, + Callback<R> callback) throws ServiceException, Throwable { + // TODO Auto-generated method stub + } + + @Override + public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, RowMutations mutation) + throws IOException { + // TODO Auto-generated method stub + return false; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d2c16533/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java index 3a43068..dcab73d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java @@ -14,7 +14,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import java.io.IOException; import java.util.List; -public interface PhoenixTransactionalTable { +public interface PhoenixTransactionalTable extends HTableInterface { /** * Transaction version of {@link HTableInterface#get(Get get)} http://git-wip-us.apache.org/repos/asf/phoenix/blob/d2c16533/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java index 0823f89..50ea600 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java @@ -2,18 +2,34 @@ package org.apache.phoenix.transaction; import java.io.IOException; import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Call; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.tephra.hbase.TransactionAwareHTable; +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; + public class TephraTransactionTable implements PhoenixTransactionalTable { private TransactionAwareHTable transactionAwareHTable; @@ -127,4 +143,161 @@ public class TephraTransactionTable implements PhoenixTransactionalTable { transactionAwareHTable.close(); } + @Override + public long incrementColumnValue(byte[] row, byte[] family, + byte[] qualifier, long amount, boolean writeToWAL) + throws IOException { + return transactionAwareHTable.incrementColumnValue(row, family, qualifier, amount, writeToWAL); + } + + @Override + public Boolean[] exists(List<Get> gets) throws IOException { + return transactionAwareHTable.exists(gets); + } + + @Override + public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) { + transactionAwareHTable.setAutoFlush(autoFlush, clearBufferOnFail); + } + + @Override + public void setAutoFlushTo(boolean autoFlush) { + transactionAwareHTable.setAutoFlush(autoFlush); + } + + @Override + public Result getRowOrBefore(byte[] row, byte[] family) throws IOException { + return transactionAwareHTable.getRowOrBefore(row, family); + } + + @Override + public TableName getName() { + return transactionAwareHTable.getName(); + } + + @Override + public boolean[] existsAll(List<Get> gets) throws IOException { + return transactionAwareHTable.existsAll(gets); + } + + @Override + public void batch(List<? extends Row> actions, Object[] results) + throws IOException, InterruptedException { + transactionAwareHTable.batch(actions, results); + } + + @Override + public Object[] batch(List<? extends Row> actions) throws IOException, + InterruptedException { + return transactionAwareHTable.batch(actions); + } + + @Override + public <R> void batchCallback(List<? extends Row> actions, + Object[] results, Callback<R> callback) throws IOException, + InterruptedException { + transactionAwareHTable.batchCallback(actions, results, callback); + } + + @Override + public <R> Object[] batchCallback(List<? extends Row> actions, + Callback<R> callback) throws IOException, InterruptedException { + return transactionAwareHTable.batchCallback(actions, callback); + } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, + byte[] value, Put put) throws IOException { + return transactionAwareHTable.checkAndPut(row, family, qualifier, value, put); + } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, Put put) throws IOException { + return transactionAwareHTable.checkAndPut(row, family, qualifier, compareOp, value, put); + } + + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + byte[] value, Delete delete) throws IOException { + return transactionAwareHTable.checkAndDelete(row, family, qualifier, value, delete); + } + + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, Delete delete) + throws IOException { + return transactionAwareHTable.checkAndDelete(row, family, qualifier, compareOp, value, delete); + } + + @Override + public void mutateRow(RowMutations rm) throws IOException { + transactionAwareHTable.mutateRow(rm); + } + + @Override + public Result append(Append append) throws IOException { + return transactionAwareHTable.append(append); + } + + @Override + public Result increment(Increment increment) throws IOException { + return transactionAwareHTable.increment(increment); + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, + byte[] qualifier, long amount) throws IOException { + return transactionAwareHTable.incrementColumnValue(row, family, qualifier, amount); + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, + byte[] qualifier, long amount, Durability durability) + throws IOException { + return transactionAwareHTable.incrementColumnValue(row, family, qualifier, amount, durability); + } + + @Override + public CoprocessorRpcChannel coprocessorService(byte[] row) { + return transactionAwareHTable.coprocessorService(row); + } + + @Override + public <T extends Service, R> Map<byte[], R> coprocessorService( + Class<T> service, byte[] startKey, byte[] endKey, + Call<T, R> callable) throws ServiceException, Throwable { + return transactionAwareHTable.coprocessorService(service, startKey, endKey, callable); + } + + @Override + public <T extends Service, R> void coprocessorService(Class<T> service, + byte[] startKey, byte[] endKey, Call<T, R> callable, + Callback<R> callback) throws ServiceException, Throwable { + transactionAwareHTable.coprocessorService(service, startKey, endKey, callable, callback); + } + + @Override + public <R extends Message> Map<byte[], R> batchCoprocessorService( + MethodDescriptor methodDescriptor, Message request, + byte[] startKey, byte[] endKey, R responsePrototype) + throws ServiceException, Throwable { + return transactionAwareHTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype); + } + + @Override + public <R extends Message> void batchCoprocessorService( + MethodDescriptor methodDescriptor, Message request, + byte[] startKey, byte[] endKey, R responsePrototype, + Callback<R> callback) throws ServiceException, Throwable { + transactionAwareHTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype, callback); + } + + @Override + public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, RowMutations mutation) + throws IOException { + return transactionAwareHTable.checkAndMutate(row, family, qualifier, compareOp, value, mutation); + } + }
