fix bugs
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f584e5f1 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f584e5f1 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f584e5f1 Branch: refs/heads/omid Commit: f584e5f1a53cfaecb309ecf3201011a1579ebf47 Parents: fa69563 Author: Ohad Shacham <[email protected]> Authored: Mon Mar 27 08:58:15 2017 +0300 Committer: Ohad Shacham <[email protected]> Committed: Mon Mar 27 08:58:15 2017 +0300 ---------------------------------------------------------------------- .../apache/phoenix/execute/PartialCommitIT.java | 4 +- .../phoenix/tx/FlappingTransactionIT.java | 27 ++-- .../phoenix/cache/IndexMetaDataCache.java | 7 +- .../coprocessor/BaseScannerRegionObserver.java | 4 +- .../phoenix/coprocessor/ScanRegionObserver.java | 5 +- .../apache/phoenix/execute/MutationState.java | 51 +++----- .../index/IndexMetaDataCacheFactory.java | 15 ++- .../phoenix/index/PhoenixIndexMetaData.java | 14 +-- .../index/PhoenixTransactionalIndexer.java | 35 +++--- .../apache/phoenix/jdbc/PhoenixConnection.java | 3 +- .../transaction/OmidTransactionContext.java | 18 +++ .../transaction/PhoenixTransactionContext.java | 29 ++++- .../transaction/TephraTransactionContext.java | 87 ++++++++++++- .../phoenix/transaction/TransactionFactory.java | 126 +++++++++++++++++++ .../apache/phoenix/util/TransactionUtil.java | 8 +- 15 files changed, 325 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/f584e5f1/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java index a5555f3..636cd84 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java @@ -271,11 +271,11 @@ public class PartialCommitIT extends BaseOwnClusterIT { return new PhoenixConnection(phxCon, null) { @Override protected MutationState newMutationState(int maxSize) { - return new MutationState(maxSize, this, mutations, null, null); + return new MutationState(maxSize, this, mutations, false, null); }; }; } - + public static class FailingRegionObserver extends SimpleRegionObserver { @Override public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit, http://git-wip-us.apache.org/repos/asf/phoenix/blob/f584e5f1/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java index 5a990cf..d34f403 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java @@ -42,6 +42,9 @@ import org.apache.phoenix.end2end.ParallelStatsDisabledIT; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.transaction.PhoenixTransactionContext; +import org.apache.phoenix.transaction.PhoenixTransactionalTable; +import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.TestUtil; import org.apache.tephra.TransactionContext; @@ -228,15 +231,17 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT { } // Use HBase level Tephra APIs to start a new transaction - TransactionAwareHTable txAware = new TransactionAwareHTable(htable, TxConstants.ConflictDetection.ROW); - TransactionContext txContext = new TransactionContext(txServiceClient, txAware); - txContext.start(); - + //TransactionAwareHTable txAware = new TransactionAwareHTable(htable, TxConstants.ConflictDetection.ROW); + PhoenixTransactionContext txContext = TransactionFactory.getTransactionFactory().getTransactionContext(pconn); + PhoenixTransactionalTable txTable = TransactionFactory.getTransactionFactory().getTransactionalTable(txContext, htable); + + txContext.begin(); + // Use HBase APIs to add a new row Put put = new Put(Bytes.toBytes("z")); put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES); put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, Bytes.toBytes("V1"), Bytes.toBytes("b")); - txAware.put(put); + txTable.put(put); // Use Phoenix APIs to add new row (sharing the transaction context) pconn.setTransactionContext(txContext); @@ -259,7 +264,7 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT { assertEquals(3,rs.getInt(1)); // Use Tephra APIs directly to finish (i.e. commit) the transaction - txContext.finish(); + txContext.commit(); // Confirm that attempt to commit row with conflict fails try { @@ -279,14 +284,16 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT { } // Repeat the same as above, but this time abort the transaction - txContext = new TransactionContext(txServiceClient, txAware); - txContext.start(); + txContext = TransactionFactory.getTransactionFactory().getTransactionContext(pconn); + txTable = TransactionFactory.getTransactionFactory().getTransactionalTable(txContext, htable); + + txContext.begin(); // Use HBase APIs to add a new row put = new Put(Bytes.toBytes("j")); put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES); put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, Bytes.toBytes("V1"), Bytes.toBytes("e")); - txAware.put(put); + txTable.put(put); // Use Phoenix APIs to add new row (sharing the transaction context) pconn.setTransactionContext(txContext); @@ -325,4 +332,4 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT { assertTrue(result.isEmpty()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/f584e5f1/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java index d22993c..16207c8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java @@ -23,9 +23,8 @@ import java.io.IOException; import java.util.Collections; import java.util.List; -import org.apache.tephra.Transaction; - import org.apache.phoenix.index.IndexMaintainer; +import org.apache.phoenix.transaction.PhoenixTransactionContext; public interface IndexMetaDataCache extends Closeable { public static final IndexMetaDataCache EMPTY_INDEX_META_DATA_CACHE = new IndexMetaDataCache() { @@ -40,11 +39,11 @@ public interface IndexMetaDataCache extends Closeable { } @Override - public Transaction getTransaction() { + public PhoenixTransactionContext getTransactionContext() { return null; } }; public List<IndexMaintainer> getIndexMaintainers(); - public Transaction getTransaction(); + public PhoenixTransactionContext getTransactionContext(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f584e5f1/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index f6bd512..321d117 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -315,7 +315,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { final byte[][] viewConstants, final TupleProjector projector, final ImmutableBytesWritable ptr) { return getWrappedScanner(c, s, null, null, offset, scan, dataColumns, tupleProjector, - dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr); + dataRegion, indexMaintainer, viewConstants, null, null, projector, ptr); } /** @@ -330,7 +330,6 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { * @param tupleProjector * @param dataRegion * @param indexMaintainer - * @param tx current transaction * @param viewConstants */ protected RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c, @@ -338,7 +337,6 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { final Expression[] arrayFuncRefs, final int offset, final Scan scan, final ColumnReference[] dataColumns, final TupleProjector tupleProjector, final Region dataRegion, final IndexMaintainer indexMaintainer, - Transaction tx, final byte[][] viewConstants, final KeyValueSchema kvSchema, final ValueBitSet kvSchemaBitSet, final TupleProjector projector, final ImmutableBytesWritable ptr) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/f584e5f1/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java index ade88db..0e0e3ba 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java @@ -204,7 +204,6 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { Region dataRegion = null; IndexMaintainer indexMaintainer = null; byte[][] viewConstants = null; - Transaction tx = null; ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan); if (dataColumns != null) { tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns); @@ -213,15 +212,13 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes); indexMaintainer = indexMaintainers.get(0); viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan); - byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE); - tx = MutationState.decodeTransaction(txState); } final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan); final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan); innerScanner = getWrappedScanner(c, innerScanner, arrayKVRefs, arrayFuncRefs, offset, scan, - dataColumns, tupleProjector, dataRegion, indexMaintainer, tx, + dataColumns, tupleProjector, dataRegion, indexMaintainer, viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : null, ptr); final ImmutableBytesPtr tenantId = ScanUtil.getTenantId(scan); http://git-wip-us.apache.org/repos/asf/phoenix/blob/f584e5f1/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index c480e30..23c8b2a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -87,7 +87,7 @@ import org.apache.phoenix.trace.util.Tracing; import org.apache.phoenix.transaction.PhoenixTransactionContext; import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel; import org.apache.phoenix.transaction.PhoenixTransactionalTable; -import org.apache.phoenix.transaction.TephraTransactionContext; +import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.LogUtil; @@ -97,17 +97,6 @@ import org.apache.phoenix.util.SQLCloseables; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.TransactionUtil; -import org.apache.tephra.Transaction; -import org.apache.tephra.Transaction.VisibilityLevel; -import org.apache.tephra.TransactionAware; -import org.apache.tephra.TransactionCodec; -import org.apache.tephra.TransactionConflictException; -import org.apache.tephra.TransactionContext; -import org.apache.tephra.TransactionFailureException; -import org.apache.tephra.TransactionSystemClient; -import org.apache.tephra.hbase.TransactionAwareHTable; -import org.apache.tephra.visibility.FenceWait; -import org.apache.tephra.visibility.VisibilityFence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -124,7 +113,6 @@ import com.google.common.collect.Sets; */ public class MutationState implements SQLCloseable { private static final Logger logger = LoggerFactory.getLogger(MutationState.class); - private static final TransactionCodec CODEC = new TransactionCodec(); private static final int[] EMPTY_STATEMENT_INDEX_ARRAY = new int[0]; private static final int MAX_COMMIT_RETRIES = 3; @@ -183,20 +171,20 @@ public class MutationState implements SQLCloseable { : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE; if (subTask == false) { if (txContext == null) { - phoenixTransactionContext = new TephraTransactionContext(connection); + phoenixTransactionContext = TransactionFactory.getTransactionFactory().getTransactionContext(connection); } else { isExternalTxContext = true; - phoenixTransactionContext = new TephraTransactionContext(txContext, connection, subTask); + phoenixTransactionContext = TransactionFactory.getTransactionFactory().getTransactionContext(txContext, connection, subTask); } } else { // this code path is only used while running child scans, we can't pass the txContext to child scans // as it is not thread safe, so we use the tx member variable - phoenixTransactionContext = new TephraTransactionContext(txContext, connection, subTask); + phoenixTransactionContext = TransactionFactory.getTransactionFactory().getTransactionContext(txContext, connection, subTask); } } public MutationState(TableRef table, Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) { - this(maxSize, connection, true, connection.getMutationState().getPhoenixTransactionContext(), sizeOffset); + this(maxSize, connection, false, null, sizeOffset); this.mutations.put(table, mutations); this.numRows = mutations.size(); throwIfTooBig(); @@ -220,7 +208,7 @@ public class MutationState implements SQLCloseable { * @param dataTable the data table upon which an index is being added * @throws SQLException */ - public void commitDDLFence(PTable dataTable, Logger logger) throws SQLException { + public void commitDDLFence(PTable dataTable) throws SQLException { if (dataTable.isTransactional()) { try { phoenixTransactionContext.commitDDLFence(dataTable, logger); @@ -305,13 +293,11 @@ public class MutationState implements SQLCloseable { // would not change as these threads are running. public HTableInterface getHTable(PTable table) throws SQLException { HTableInterface htable = this.getConnection().getQueryServices().getTable(table.getPhysicalName().getBytes()); - Transaction currentTx; - if (table.isTransactional() && (currentTx=getTransaction()) != null) { - TransactionAwareHTable txAware = TransactionUtil.getTransactionAwareHTable(htable, table.isImmutableRows()); + if (table.isTransactional() && phoenixTransactionContext.isTransactionRunning()) { + PhoenixTransactionalTable phoenixTransactionTable = TransactionUtil.getPhoenixTransactionTable(phoenixTransactionContext, htable, table.isImmutableRows()); // Using cloned mutationState as we may have started a new transaction already // if auto commit is true and we need to use the original one here. - txAware.startTx(currentTx); - htable = txAware; + htable = phoenixTransactionTable; } return htable; } @@ -440,7 +426,7 @@ public class MutationState implements SQLCloseable { return; } - phoenixTransactionContext.join(getPhoenixTransactionContext()); + phoenixTransactionContext.join(newMutationState.getPhoenixTransactionContext()); this.sizeOffset += newMutationState.sizeOffset; joinMutationState(newMutationState.mutations, this.mutations); @@ -1090,17 +1076,9 @@ public class MutationState implements SQLCloseable { } public byte[] encodeTransaction() throws SQLException { - try { - return CODEC.encode(getTransaction()); - } catch (IOException e) { - throw new SQLException(e); - } + return phoenixTransactionContext.encodeTransaction(); } - public static Transaction decodeTransaction(byte[] txnBytes) throws IOException { - return (txnBytes == null || txnBytes.length==0) ? null : CODEC.decode(txnBytes); - } - private ServerCache setMetaDataOnMutations(TableRef tableRef, List<? extends Mutation> mutations, ImmutableBytesWritable indexMetaDataPtr) throws SQLException { PTable table = tableRef.getTable(); @@ -1333,12 +1311,13 @@ public class MutationState implements SQLCloseable { * @throws SQLException */ public boolean sendUncommitted(Iterator<TableRef> tableRefs) throws SQLException { - Transaction currentTx = getTransaction(); - if (currentTx != null) { + + if (phoenixTransactionContext.isTransactionRunning()) { // Initialize visibility so that transactions see their own writes. // The checkpoint() method will set it to not see writes if necessary. - currentTx.setVisibility(VisibilityLevel.SNAPSHOT); + phoenixTransactionContext.setVisibilityLevel(PhoenixVisibilityLevel.SNAPSHOT); } + Iterator<TableRef> filteredTableRefs = Iterators.filter(tableRefs, new Predicate<TableRef>(){ @Override public boolean apply(TableRef tableRef) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/f584e5f1/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java index 56849fe..8658524 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java @@ -24,15 +24,13 @@ import java.io.IOException; import java.sql.SQLException; import java.util.List; -import org.apache.tephra.Transaction; - import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.cache.IndexMetaDataCache; import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory; -import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; import org.apache.phoenix.memory.MemoryManager.MemoryChunk; -import org.apache.phoenix.util.TransactionUtil; +import org.apache.phoenix.transaction.PhoenixTransactionContext; +import org.apache.phoenix.transaction.TransactionFactory; public class IndexMetaDataCacheFactory implements ServerCacheFactory { public IndexMetaDataCacheFactory() { @@ -49,11 +47,12 @@ public class IndexMetaDataCacheFactory implements ServerCacheFactory { @Override public Closeable newCache (ImmutableBytesWritable cachePtr, byte[] txState, final MemoryChunk chunk) throws SQLException { // just use the standard keyvalue builder - this doesn't really need to be fast + final List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(cachePtr, GenericKeyValueBuilder.INSTANCE); - final Transaction txn; + final PhoenixTransactionContext txnContext; try { - txn = txState.length!=0 ? MutationState.decodeTransaction(txState) : null; + txnContext = txState.length != 0 ? TransactionFactory.getTransactionFactory().getTransactionContext(txState) : null; } catch (IOException e) { throw new SQLException(e); } @@ -70,8 +69,8 @@ public class IndexMetaDataCacheFactory implements ServerCacheFactory { } @Override - public Transaction getTransaction() { - return txn; + public PhoenixTransactionContext getTransactionContext() { + return txnContext; } }; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f584e5f1/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java index d22e957..82fe2f3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java @@ -30,12 +30,12 @@ import org.apache.phoenix.cache.TenantCache; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; -import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.transaction.PhoenixTransactionContext; +import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.ServerUtil; -import org.apache.tephra.Transaction; public class PhoenixIndexMetaData implements IndexMetaData { private final Map<String, byte[]> attributes; @@ -51,7 +51,7 @@ public class PhoenixIndexMetaData implements IndexMetaData { byte[] txState = attributes.get(BaseScannerRegionObserver.TX_STATE); if (md != null) { final List<IndexMaintainer> indexMaintainers = IndexMaintainer.deserialize(md); - final Transaction txn = MutationState.decodeTransaction(txState); + final PhoenixTransactionContext txnContext = TransactionFactory.getTransactionFactory().getTransactionContext(txState); return new IndexMetaDataCache() { @Override @@ -63,8 +63,8 @@ public class PhoenixIndexMetaData implements IndexMetaData { } @Override - public Transaction getTransaction() { - return txn; + public PhoenixTransactionContext getTransactionContext() { + return txnContext; } }; @@ -96,8 +96,8 @@ public class PhoenixIndexMetaData implements IndexMetaData { this.ignoreNewerMutations = attributes.get(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS) != null; } - public Transaction getTransaction() { - return indexMetaDataCache.getTransaction(); + public PhoenixTransactionContext getTransactionContext() { + return indexMetaDataCache.getTransactionContext(); } public List<IndexMaintainer> getIndexMaintainers() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/f584e5f1/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index fdf5498..a418c24 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -69,14 +69,14 @@ import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.trace.TracingUtils; import org.apache.phoenix.trace.util.NullSpan; +import org.apache.phoenix.transaction.PhoenixTransactionContext; +import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel; +import org.apache.phoenix.transaction.PhoenixTransactionalTable; +import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.TransactionUtil; -import org.apache.tephra.Transaction; -import org.apache.tephra.Transaction.VisibilityLevel; -import org.apache.tephra.TxConstants; -import org.apache.tephra.hbase.TransactionAwareHTable; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -149,7 +149,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { Map<String,byte[]> updateAttributes = m.getAttributesMap(); PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(c.getEnvironment(),updateAttributes); - byte[] txRollbackAttribute = m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY); + byte[] txRollbackAttribute = m.getAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY); Collection<Pair<Mutation, byte[]>> indexUpdates = null; // get the current span, or just use a null-span to avoid a bunch of if statements try (TraceScope scope = Trace.startSpan("Starting to build index updates")) { @@ -186,14 +186,14 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { } private Collection<Pair<Mutation, byte[]>> getIndexUpdates(RegionCoprocessorEnvironment env, PhoenixIndexMetaData indexMetaData, Iterator<Mutation> mutationIterator, byte[] txRollbackAttribute) throws IOException { - Transaction tx = indexMetaData.getTransaction(); - if (tx == null) { + PhoenixTransactionContext txnContext = indexMetaData.getTransactionContext(); + if (txnContext == null) { throw new NullPointerException("Expected to find transaction in metadata for " + env.getRegionInfo().getTable().getNameAsString()); } boolean isRollback = txRollbackAttribute!=null; boolean isImmutable = indexMetaData.isImmutableRows(); ResultScanner currentScanner = null; - TransactionAwareHTable txTable = null; + PhoenixTransactionalTable txTable = null; // Collect up all mutations in batch Map<ImmutableBytesPtr, MultiMutation> mutations = new HashMap<ImmutableBytesPtr, MultiMutation>(); @@ -250,23 +250,22 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { scanRanges.initializeScan(scan); TableName tableName = env.getRegion().getRegionInfo().getTable(); HTableInterface htable = env.getTable(tableName); - txTable = new TransactionAwareHTable(htable); - txTable.startTx(tx); + txTable = TransactionFactory.getTransactionFactory().getTransactionalTable(txnContext, htable); // For rollback, we need to see all versions, including // the last committed version as there may be multiple // checkpointed versions. SkipScanFilter filter = scanRanges.getSkipScanFilter(); if (isRollback) { filter = new SkipScanFilter(filter,true); - tx.setVisibility(VisibilityLevel.SNAPSHOT_ALL); + txnContext.setVisibilityLevel(PhoenixVisibilityLevel.SNAPSHOT_ALL); } scan.setFilter(filter); currentScanner = txTable.getScanner(scan); } if (isRollback) { - processRollback(env, indexMetaData, txRollbackAttribute, currentScanner, tx, mutableColumns, indexUpdates, mutations); + processRollback(env, indexMetaData, txRollbackAttribute, currentScanner, txnContext, mutableColumns, indexUpdates, mutations); } else { - processMutation(env, indexMetaData, txRollbackAttribute, currentScanner, tx, mutableColumns, indexUpdates, mutations, findPriorValueMutations); + processMutation(env, indexMetaData, txRollbackAttribute, currentScanner, txnContext, mutableColumns, indexUpdates, mutations, findPriorValueMutations); } } finally { if (txTable != null) txTable.close(); @@ -289,7 +288,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { private void processMutation(RegionCoprocessorEnvironment env, PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute, ResultScanner scanner, - Transaction tx, + PhoenixTransactionContext txnContext, Set<ColumnReference> upsertColumns, Collection<Pair<Mutation, byte[]>> indexUpdates, Map<ImmutableBytesPtr, MultiMutation> mutations, @@ -300,14 +299,14 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { // Process existing data table rows by removing the old index row and adding the new index row while ((result = scanner.next()) != null) { Mutation m = mutationsToFindPreviousValue.remove(new ImmutableBytesPtr(result.getRow())); - TxTableState state = new TxTableState(env, upsertColumns, indexMetaData.getAttributes(), tx.getWritePointer(), m, emptyColRef, result); + TxTableState state = new TxTableState(env, upsertColumns, indexMetaData.getAttributes(), txnContext.getWritePointer(), m, emptyColRef, result); generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state); generatePuts(indexMetaData, indexUpdates, state); } } // Process new data table by adding new index rows for (Mutation m : mutations.values()) { - TxTableState state = new TxTableState(env, upsertColumns, indexMetaData.getAttributes(), tx.getWritePointer(), m); + TxTableState state = new TxTableState(env, upsertColumns, indexMetaData.getAttributes(), txnContext.getWritePointer(), m); generatePuts(indexMetaData, indexUpdates, state); } } @@ -315,7 +314,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { private void processRollback(RegionCoprocessorEnvironment env, PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute, ResultScanner scanner, - Transaction tx, Set<ColumnReference> mutableColumns, + PhoenixTransactionContext tx, Set<ColumnReference> mutableColumns, Collection<Pair<Mutation, byte[]>> indexUpdates, Map<ImmutableBytesPtr, MultiMutation> mutations) throws IOException { if (scanner != null) { @@ -402,7 +401,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, indexMetaData); for (IndexUpdate delete : deletes) { if (delete.isValid()) { - delete.getUpdate().setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, attribValue); + delete.getUpdate().setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, attribValue); indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName())); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f584e5f1/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index cb2390e..d387ab7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -103,6 +103,7 @@ import org.apache.phoenix.schema.types.PUnsignedDate; import org.apache.phoenix.schema.types.PUnsignedTime; import org.apache.phoenix.schema.types.PUnsignedTimestamp; import org.apache.phoenix.trace.util.Tracing; +import org.apache.phoenix.transaction.PhoenixTransactionContext; import org.apache.phoenix.util.DateUtil; import org.apache.phoenix.util.JDBCUtil; import org.apache.phoenix.util.NumberUtil; @@ -629,7 +630,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea mutationState.sendUncommitted(); } - public void setTransactionContext(TransactionContext txContext) throws SQLException { + public void setTransactionContext(PhoenixTransactionContext txContext) throws SQLException { if (!this.services.getProps().getBoolean(QueryServices.TRANSACTIONS_ENABLED, QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_MUST_BE_ENABLED_TO_SET_TX_CONTEXT) .build().buildException(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/f584e5f1/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java index 596cf73..8a4e284 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java @@ -85,4 +85,22 @@ public class OmidTransactionContext implements PhoenixTransactionContext { // TODO Auto-generated method stub return null; } + + @Override + public void setVisibilityLevel(PhoenixVisibilityLevel visibilityLevel) { + // TODO Auto-generated method stub + + } + + @Override + public byte[] encodeTransaction() throws SQLException { + // TODO Auto-generated method stub + return null; + } + + @Override + public long getMaxTransactionsPerSecond() { + // TODO Auto-generated method stub + return 0; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f584e5f1/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java index 2d0d5b7..bd63930 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java @@ -1,7 +1,6 @@ package org.apache.phoenix.transaction; import org.apache.phoenix.schema.PTable; -import org.apache.tephra.Transaction.VisibilityLevel; import org.slf4j.Logger; import java.sql.SQLException; @@ -20,6 +19,8 @@ public interface PhoenixTransactionContext { SNAPSHOT_ALL } + public static final String TX_ROLLBACK_ATTRIBUTE_KEY = "phoenix.tx.rollback"; + /** * Starts a transaction * @@ -87,20 +88,36 @@ public interface PhoenixTransactionContext { /** * Returns transaction unique identifier */ - long getTransactionId(); + public long getTransactionId(); /** * Returns transaction snapshot id */ - long getReadPointer(); + public long getReadPointer(); /** * Returns transaction write pointer. After checkpoint the write pointer is different than the initial one */ - long getWritePointer(); + public long getWritePointer(); + + /** + * Set visibility level + */ + public void setVisibilityLevel(PhoenixVisibilityLevel visibilityLevel); /** - * Returns visibility level + * Returns visibility level + */ + public PhoenixVisibilityLevel getVisibilityLevel(); + + /** + * Encode transaction + */ + public byte[] encodeTransaction() throws SQLException; + + /** + * + * @return max transactions per second */ - PhoenixVisibilityLevel getVisibilityLevel(); + public long getMaxTransactionsPerSecond(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f584e5f1/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java index f8096d5..cfa3ac3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java @@ -1,5 +1,6 @@ package org.apache.phoenix.transaction; +import java.io.IOException; import java.sql.SQLException; import java.util.Collections; import java.util.List; @@ -11,14 +12,20 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel; import org.apache.tephra.Transaction; import org.apache.tephra.TransactionAware; +import org.apache.tephra.TransactionCodec; import org.apache.tephra.TransactionConflictException; import org.apache.tephra.TransactionContext; import org.apache.tephra.TransactionFailureException; +import org.apache.tephra.TransactionManager; import org.apache.tephra.TransactionSystemClient; import org.apache.tephra.Transaction.VisibilityLevel; +import org.apache.tephra.TxConstants; +import org.apache.tephra.inmemory.InMemoryTxSystemClient; import org.apache.tephra.visibility.FenceWait; import org.apache.tephra.visibility.VisibilityFence; @@ -28,12 +35,24 @@ import org.slf4j.Logger; public class TephraTransactionContext implements PhoenixTransactionContext { + private static final TransactionCodec CODEC = new TransactionCodec(); + private final List<TransactionAware> txAwares; private final TransactionContext txContext; private Transaction tx; private TransactionSystemClient txServiceClient; private TransactionFailureException e; + public TephraTransactionContext() { + this.txServiceClient = null; + this.txAwares = Lists.newArrayList(); + this.txContext = null; + } + + public TephraTransactionContext(byte[] txnBytes) throws IOException { + this(); + this.tx = (txnBytes != null && txnBytes.length > 0) ? CODEC.decode(txnBytes) : null; + } public TephraTransactionContext(PhoenixConnection connection) { this.txServiceClient = connection.getQueryServices().getTransactionSystemClient(); @@ -65,6 +84,7 @@ public class TephraTransactionContext implements PhoenixTransactionContext { throw new SQLExceptionInfo.Builder(SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build().buildException(); } + System.out.println("BEGIN"); try { txContext.start(); } catch (TransactionFailureException e) { @@ -150,6 +170,14 @@ public class TephraTransactionContext implements PhoenixTransactionContext { } } + private Transaction getCurrentTransaction() { + if (this.txContext != null) { + return this.txContext.getCurrentTransaction(); + } + + return this.tx; + } + @Override public void commitDDLFence(PTable dataTable, Logger logger) throws SQLException { byte[] key = dataTable.getName().getBytes(); @@ -159,7 +187,7 @@ public class TephraTransactionContext implements PhoenixTransactionContext { fenceWait.await(10000, TimeUnit.MILLISECONDS); if (logger.isInfoEnabled()) { - logger.info("Added write fence at ~" + getTransaction().getReadPointer()); + logger.info("Added write fence at ~" + getCurrentTransaction().getReadPointer()); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -199,8 +227,6 @@ public class TephraTransactionContext implements PhoenixTransactionContext { assert(ctx instanceof TephraTransactionContext); TephraTransactionContext tephraContext = (TephraTransactionContext) ctx; - tephraContext.getAwares(); - if (txContext != null) { for (TransactionAware txAware : tephraContext.getAwares()) { txContext.addTransactionAware(txAware); @@ -269,6 +295,33 @@ public class TephraTransactionContext implements PhoenixTransactionContext { return HConstants.LATEST_TIMESTAMP; } + @Override + public void setVisibilityLevel(PhoenixVisibilityLevel visibilityLevel) { + VisibilityLevel tephraVisibilityLevel = null; + + switch(visibilityLevel) { + case SNAPSHOT: + tephraVisibilityLevel = VisibilityLevel.SNAPSHOT; + break; + case SNAPSHOT_EXCLUDE_CURRENT: + tephraVisibilityLevel = VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT; + break; + case SNAPSHOT_ALL: + tephraVisibilityLevel = VisibilityLevel.SNAPSHOT_ALL; + break; + default: + assert(false); + } + + if (this.txContext != null) { + txContext.getCurrentTransaction().setVisibility(tephraVisibilityLevel); + } else if (tx != null) { + tx.setVisibility(tephraVisibilityLevel); + } else { + assert(false); + } + } + // For testing @Override public PhoenixVisibilityLevel getVisibilityLevel() { @@ -297,7 +350,33 @@ public class TephraTransactionContext implements PhoenixTransactionContext { return phoenixVisibilityLevel; } - /** + @Override + public byte[] encodeTransaction() throws SQLException { + + Transaction transaction = null; + + if (this.txContext != null) { + transaction = txContext.getCurrentTransaction(); + } else if (tx != null) { + transaction = tx; + } + + assert (transaction != null); + + try { + return CODEC.encode(transaction); + } catch (IOException e) { + throw new SQLException(e); + } + } + + @Override + public long getMaxTransactionsPerSecond() { + return TxConstants.MAX_TX_PER_MS; + } + + + /** * TephraTransactionContext specific functions */ http://git-wip-us.apache.org/repos/asf/phoenix/blob/f584e5f1/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java new file mode 100644 index 0000000..ba80d02 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java @@ -0,0 +1,126 @@ +package org.apache.phoenix.transaction; + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.phoenix.jdbc.PhoenixConnection; + +public class TransactionFactory { + + static private TransactionFactory transactionFactory = null; + + private TransactionProcessor tp = TransactionProcessor.Tephra; + + enum TransactionProcessor { + Tephra, + Omid + } + + private TransactionFactory(TransactionProcessor tp) { + this.tp = tp; + } + + static public void createTransactionFactory(TransactionProcessor tp) { + if (transactionFactory == null) { + transactionFactory = new TransactionFactory(tp); + } + } + + static public TransactionFactory getTransactionFactory() { + if (transactionFactory == null) { + createTransactionFactory(TransactionProcessor.Tephra); + } + + return transactionFactory; + } + + public PhoenixTransactionContext getTransactionContext() { + + PhoenixTransactionContext ctx = null; + + switch(tp) { + case Tephra: + ctx = new TephraTransactionContext(); + break; + case Omid: + ctx = new OmidTransactionContext(); + break; + default: + ctx = null; + } + + return ctx; + } + + public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) throws IOException { + + PhoenixTransactionContext ctx = null; + + switch(tp) { + case Tephra: + ctx = new TephraTransactionContext(txnBytes); + break; + case Omid: +// ctx = new OmidTransactionContext(txnBytes); + break; + default: + ctx = null; + } + + return ctx; + } + + public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection) { + + PhoenixTransactionContext ctx = null; + + switch(tp) { + case Tephra: + ctx = new TephraTransactionContext(connection); + break; + case Omid: +// ctx = new OmidTransactionContext(connection); + break; + default: + ctx = null; + } + + return ctx; + } + + public PhoenixTransactionContext getTransactionContext(PhoenixTransactionContext contex, PhoenixConnection connection, boolean subTask) { + + PhoenixTransactionContext ctx = null; + + switch(tp) { + case Tephra: + ctx = new TephraTransactionContext(contex, connection, subTask); + break; + case Omid: +// ctx = new OmidTransactionContext(contex, connection, subTask); + break; + default: + ctx = null; + } + + return ctx; + } + + public PhoenixTransactionalTable getTransactionalTable(PhoenixTransactionContext ctx, HTableInterface htable) { + + PhoenixTransactionalTable table = null; + + switch(tp) { + case Tephra: + table = new TephraTransactionTable(ctx, htable); + break; + case Omid: +// table = new OmidTransactionContext(contex, connection, subTask); + break; + default: + table = null; + } + + return table; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/f584e5f1/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java index 4fbbe57..94a56b8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java @@ -32,10 +32,8 @@ import org.apache.phoenix.schema.PTable; import org.apache.phoenix.transaction.PhoenixTransactionContext; import org.apache.phoenix.transaction.PhoenixTransactionalTable; import org.apache.phoenix.transaction.TephraTransactionTable; -import org.apache.tephra.TransactionConflictException; -import org.apache.tephra.TransactionFailureException; +import org.apache.phoenix.transaction.TransactionFactory; import org.apache.tephra.TxConstants; -import org.apache.tephra.hbase.TransactionAwareHTable; public class TransactionUtil { private TransactionUtil() { @@ -46,11 +44,11 @@ public class TransactionUtil { } public static long convertToNanoseconds(long serverTimeStamp) { - return serverTimeStamp * TxConstants.MAX_TX_PER_MS; + return serverTimeStamp * TransactionFactory.getTransactionFactory().getTransactionContext().getMaxTransactionsPerSecond(); } public static long convertToMilliseconds(long serverTimeStamp) { - return serverTimeStamp / TxConstants.MAX_TX_PER_MS; + return serverTimeStamp / TransactionFactory.getTransactionFactory().getTransactionContext().getMaxTransactionsPerSecond(); } public static PhoenixTransactionalTable getPhoenixTransactionTable(PhoenixTransactionContext phoenixTransactionContext, HTableInterface htable, boolean isImmutableRows) {
