initial version of Tephra implementation
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/cea251cf Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/cea251cf Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/cea251cf Branch: refs/heads/omid Commit: cea251cfcb9699a90d10dfe82626c264b9016bc4 Parents: acfc9d5 Author: Ohad Shacham <[email protected]> Authored: Tue Feb 14 15:57:23 2017 +0200 Committer: Ohad Shacham <[email protected]> Committed: Tue Feb 14 15:57:23 2017 +0200 ---------------------------------------------------------------------- .../transaction/OmidTransactionContext.java | 8 +- .../transaction/OmidTransactionTable.java | 8 +- .../transaction/PhoenixTransactionContext.java | 10 +- .../transaction/PhoenixTransactionalTable.java | 5 - .../transaction/TephraTransactionContext.java | 256 +++++++++++++++++-- .../transaction/TephraTransactionTable.java | 79 +++--- 6 files changed, 265 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/cea251cf/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java index bc5b05b..937ac14 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java @@ -26,13 +26,7 @@ public class OmidTransactionContext implements PhoenixTransactionContext { } @Override - public void abort(SQLException e) throws SQLException { - // TODO Auto-generated method stub - - } - - @Override - public void checkpoint() throws SQLException { + public void checkpoint(boolean hasUncommittedData) throws SQLException { // TODO Auto-generated method stub } http://git-wip-us.apache.org/repos/asf/phoenix/blob/cea251cf/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java index f15fdd3..725fe16 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java @@ -15,7 +15,7 @@ import org.apache.hadoop.hbase.client.Scan; public class OmidTransactionTable implements PhoenixTransactionalTable { - public OmidTransactionTable(PhoenixTransactionContext ctx) { + public OmidTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable) { // TODO Auto-generated constructor stub } @@ -99,12 +99,6 @@ public class OmidTransactionTable implements PhoenixTransactionalTable { } @Override - public HTableInterface getHTable() { - // TODO Auto-generated method stub - return null; - } - - @Override public void setAutoFlush(boolean autoFlush) { // TODO Auto-generated method stub http://git-wip-us.apache.org/repos/asf/phoenix/blob/cea251cf/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java index f07640e..87b68f9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java @@ -29,18 +29,10 @@ public interface PhoenixTransactionContext { public void abort() throws SQLException; /** - * Rollback a transaction - * - * @param e - * @throws SQLException - */ - public void abort(SQLException e) throws SQLException; - - /** * Create a checkpoint in a transaction as defined in [TEPHRA-96] * @throws SQLException */ - public void checkpoint() throws SQLException; + public void checkpoint(boolean hasUncommittedData) throws SQLException; /** * Commit DDL to guarantee that no transaction started before create index http://git-wip-us.apache.org/repos/asf/phoenix/blob/cea251cf/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java index ff2632c..3a43068 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java @@ -101,11 +101,6 @@ public interface PhoenixTransactionalTable { public void delete(List<Delete> deletes) throws IOException; /** - * Return the underling htable - */ - public HTableInterface getHTable(); - - /** * Delegates to {@link HTable#setAutoFlush(boolean autoFlush)} */ public void setAutoFlush(boolean autoFlush); http://git-wip-us.apache.org/repos/asf/phoenix/blob/cea251cf/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java index 17c70f0..81c9fd1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java @@ -1,83 +1,285 @@ package org.apache.phoenix.transaction; import java.sql.SQLException; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.schema.PTable; +import org.apache.tephra.Transaction; +import org.apache.tephra.TransactionAware; +import org.apache.tephra.TransactionConflictException; +import org.apache.tephra.TransactionContext; +import org.apache.tephra.TransactionFailureException; +import org.apache.tephra.TransactionSystemClient; +import org.apache.tephra.Transaction.VisibilityLevel; +import org.apache.tephra.visibility.FenceWait; +import org.apache.tephra.visibility.VisibilityFence; + +import com.google.common.collect.Lists; public class TephraTransactionContext implements PhoenixTransactionContext { - @Override - public void begin() throws SQLException { - // TODO Auto-generated method stub + private final List<TransactionAware> txAwares; + private final TransactionContext txContext; + private Transaction tx; + private TransactionSystemClient txServiceClient; + private TransactionFailureException e; - } + public TephraTransactionContext(PhoenixTransactionContext ctx, PhoenixConnection connection, boolean threadSafe) { - @Override - public void commit() throws SQLException { - // TODO Auto-generated method stub + this.txServiceClient = connection.getQueryServices().getTransactionSystemClient(); // TODO Should be wrapped for Omid side usage + assert(ctx instanceof TephraTransactionContext); + TephraTransactionContext tephraTransactionContext = (TephraTransactionContext) ctx; + + if (threadSafe) { + this.tx = tephraTransactionContext.getTransaction(); + this.txAwares = Lists.newArrayList(); + this.txContext = null; + } else { + this.txAwares = Collections.emptyList(); + if (ctx == null) { + this.txContext = new TransactionContext(txServiceClient); + } else { + this.txContext = tephraTransactionContext.getContext(); + } + } + + this.e = null; } @Override - public void abort() throws SQLException { - // TODO Auto-generated method stub + public void begin() throws SQLException { + if (txContext == null) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build().buildException(); + } + try { + txContext.start(); + } catch (TransactionFailureException e) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED) + .setMessage(e.getMessage()) + .setRootCause(e) + .build().buildException(); + } } @Override - public void abort(SQLException e) throws SQLException { - // TODO Auto-generated method stub - + public void commit() throws SQLException { + try { + assert(txContext != null); + txContext.finish(); + } catch (TransactionFailureException e) { + this.e = e; + if (e instanceof TransactionConflictException) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION) + .setMessage(e.getMessage()) + .setRootCause(e) + .build().buildException(); + } + throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED) + .setMessage(e.getMessage()) + .setRootCause(e) + .build().buildException(); + } } @Override - public void checkpoint() throws SQLException { - // TODO Auto-generated method stub + public void abort() throws SQLException { + try { + if (e != null) { + txContext.abort(e); + e = null; + } else { + txContext.abort(); + } + } catch (TransactionFailureException e) { + this.e = null; + throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED) + .setMessage(e.getMessage()) + .setRootCause(e) + .build().buildException(); + } + } + @Override + public void checkpoint(boolean hasUncommittedData) throws SQLException { + if (hasUncommittedData) { + try { + if (txContext == null) { + tx = txServiceClient.checkpoint(tx); + } else { + assert(txContext != null); + txContext.checkpoint(); + tx = txContext.getCurrentTransaction(); + } + } catch (TransactionFailureException e) { + throw new SQLException(e); + } + } + + if (txContext == null) { + tx.setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT); + } + else { + assert(txContext != null); + txContext.getCurrentTransaction().setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT); + } } @Override public void commitDDLFence(PTable dataTable) throws SQLException, InterruptedException, TimeoutException { - // TODO Auto-generated method stub - + byte[] key = dataTable.getName().getBytes(); + try { + FenceWait fenceWait = VisibilityFence.prepareWait(key, txServiceClient); + fenceWait.await(10000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build().buildException(); + } catch (TimeoutException | TransactionFailureException e) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_UNABLE_TO_GET_WRITE_FENCE) + .setSchemaName(dataTable.getSchemaName().getString()) + .setTableName(dataTable.getTableName().getString()) + .build().buildException(); + } } @Override public void markDMLFence(PTable table) { - // TODO Auto-generated method stub - + byte[] logicalKey = table.getName().getBytes(); + TransactionAware logicalTxAware = VisibilityFence.create(logicalKey); + if (this.txContext == null) { + this.txAwares.add(logicalTxAware); + } else { + this.txContext.addTransactionAware(logicalTxAware); + } + byte[] physicalKey = table.getPhysicalName().getBytes(); + if (Bytes.compareTo(physicalKey, logicalKey) != 0) { + TransactionAware physicalTxAware = VisibilityFence.create(physicalKey); + if (this.txContext == null) { + this.txAwares.add(physicalTxAware); + } else { + this.txContext.addTransactionAware(physicalTxAware); + } + } } @Override public void join(PhoenixTransactionContext ctx) { - // TODO Auto-generated method stub + assert(ctx instanceof TephraTransactionContext); + TephraTransactionContext tephraContext = (TephraTransactionContext) ctx; + + tephraContext.getAwares(); + if (txContext != null) { + for (TransactionAware txAware : tephraContext.getAwares()) { + txContext.addTransactionAware(txAware); + } + } else { + txAwares.addAll(tephraContext.getAwares()); + } } - @Override + @Override public boolean isTransactionRunning() { - // TODO Auto-generated method stub + if (this.txContext != null) { + return (this.txContext.getCurrentTransaction() != null) ? true : false; + } + + if (this.tx != null) { + return true; + } + return false; } @Override public void reset() { - // TODO Auto-generated method stub - + tx = null; + txAwares.clear(); } @Override public long getTransactionId() { - // TODO Auto-generated method stub - return 0; + if (this.txContext != null) { + return txContext.getCurrentTransaction().getTransactionId(); + } + + if (tx != null) { + return tx.getTransactionId(); + } + + return HConstants.LATEST_TIMESTAMP; } @Override public long getReadPointer() { - // TODO Auto-generated method stub - return 0; + if (this.txContext != null) { + return txContext.getCurrentTransaction().getReadPointer(); + } + + if (tx != null) { + return tx.getReadPointer(); + } + + return (-1); } + /** + * TephraTransactionContext specific functions + */ + + Transaction getTransaction() { + return this.tx; + } + + TransactionContext getContext() { + return this.txContext; + } + + List<TransactionAware> getAwares() { + return txAwares; + } + + void addTransactionAware(TransactionAware txAware) { + if (this.txContext != null) { + txContext.addTransactionAware(txAware); + } else if (this.tx != null) { + txAwares.add(txAware); + } + } + + // For testing + public long getWritePointer() { + if (this.txContext != null) { + return txContext.getCurrentTransaction().getWritePointer(); + } + + if (tx != null) { + return tx.getWritePointer(); + } + + return HConstants.LATEST_TIMESTAMP; + } + + // For testing + public VisibilityLevel getVisibilityLevel() { + if (this.txContext != null) { + return txContext.getCurrentTransaction().getVisibilityLevel(); + } + + if (tx != null) { + return tx.getVisibilityLevel(); + } + + return null; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/cea251cf/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java index 0d788c1..c5ba33f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java @@ -12,132 +12,119 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.tephra.hbase.TransactionAwareHTable; public class TephraTransactionTable implements PhoenixTransactionalTable { - public TephraTransactionTable(PhoenixTransactionContext ctx) { - // TODO Auto-generated constructor stub + private TransactionAwareHTable transactionAwareHTable; + + private TephraTransactionContext tephraTransactionContext; + + public TephraTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable) { + + assert(ctx instanceof TephraTransactionContext); + + tephraTransactionContext = (TephraTransactionContext) ctx; + + transactionAwareHTable = new TransactionAwareHTable(hTable); + + tephraTransactionContext.addTransactionAware(transactionAwareHTable); } @Override public Result get(Get get) throws IOException { - // TODO Auto-generated method stub - return null; + return transactionAwareHTable.get(get); } @Override public void put(Put put) throws IOException { - // TODO Auto-generated method stub - + transactionAwareHTable.put(put); } @Override public void delete(Delete delete) throws IOException { - // TODO Auto-generated method stub - + transactionAwareHTable.delete(delete); } @Override public ResultScanner getScanner(Scan scan) throws IOException { - // TODO Auto-generated method stub - return null; + return transactionAwareHTable.getScanner(scan); } @Override public byte[] getTableName() { - // TODO Auto-generated method stub - return null; + return transactionAwareHTable.getTableName(); } @Override public Configuration getConfiguration() { - // TODO Auto-generated method stub - return null; + return transactionAwareHTable.getConfiguration(); } @Override public HTableDescriptor getTableDescriptor() throws IOException { - // TODO Auto-generated method stub - return null; + return transactionAwareHTable.getTableDescriptor(); } @Override public boolean exists(Get get) throws IOException { - // TODO Auto-generated method stub - return false; + return transactionAwareHTable.exists(get); } @Override public Result[] get(List<Get> gets) throws IOException { - // TODO Auto-generated method stub - return null; + return transactionAwareHTable.get(gets); } @Override public ResultScanner getScanner(byte[] family) throws IOException { - // TODO Auto-generated method stub - return null; + return transactionAwareHTable.getScanner(family); } @Override public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { - // TODO Auto-generated method stub - return null; + return transactionAwareHTable.getScanner(family, qualifier); } @Override public void put(List<Put> puts) throws IOException { - // TODO Auto-generated method stub - + transactionAwareHTable.put(puts); } @Override public void delete(List<Delete> deletes) throws IOException { - // TODO Auto-generated method stub - - } - - @Override - public HTableInterface getHTable() { - // TODO Auto-generated method stub - return null; + transactionAwareHTable.delete(deletes); } @Override public void setAutoFlush(boolean autoFlush) { - // TODO Auto-generated method stub - + transactionAwareHTable.setAutoFlush(autoFlush); } @Override public boolean isAutoFlush() { - // TODO Auto-generated method stub - return false; + return transactionAwareHTable.isAutoFlush(); } @Override public long getWriteBufferSize() { - // TODO Auto-generated method stub - return 0; + return transactionAwareHTable.getWriteBufferSize(); } @Override public void setWriteBufferSize(long writeBufferSize) throws IOException { - // TODO Auto-generated method stub - + transactionAwareHTable.setWriteBufferSize(writeBufferSize); } @Override public void flushCommits() throws IOException { - // TODO Auto-generated method stub - + transactionAwareHTable.flushCommits(); } @Override public void close() throws IOException { - // TODO Auto-generated method stub - + transactionAwareHTable.close(); } }
