Connect TAL to Phoenix
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2f2f6cc5 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2f2f6cc5 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2f2f6cc5 Branch: refs/heads/4.x-HBase-1.1 Commit: 2f2f6cc57b5ffe2dad973776eab209243b12397a Parents: 781cb3a Author: Ohad Shacham <[email protected]> Authored: Sun Jun 25 13:10:10 2017 +0300 Committer: Thomas <[email protected]> Committed: Mon Jun 26 12:09:00 2017 -0700 ---------------------------------------------------------------------- .../apache/phoenix/execute/PartialCommitIT.java | 4 +- .../phoenix/tx/FlappingTransactionIT.java | 41 +- .../phoenix/tx/ParameterizedTransactionIT.java | 7 +- .../org/apache/phoenix/tx/TransactionIT.java | 21 +- .../org/apache/phoenix/tx/TxCheckpointIT.java | 63 +-- .../phoenix/cache/IndexMetaDataCache.java | 7 +- .../coprocessor/BaseScannerRegionObserver.java | 1 - .../PhoenixTransactionalProcessor.java | 4 +- .../phoenix/coprocessor/ScanRegionObserver.java | 1 + .../UngroupedAggregateRegionObserver.java | 6 +- .../apache/phoenix/execute/MutationState.java | 375 +++++--------- .../apache/phoenix/index/IndexMaintainer.java | 4 +- .../index/IndexMetaDataCacheFactory.java | 15 +- .../phoenix/index/PhoenixIndexMetaData.java | 14 +- .../index/PhoenixTransactionalIndexer.java | 35 +- .../NonAggregateRegionScannerFactory.java | 5 +- .../phoenix/iterate/RegionScannerFactory.java | 4 +- .../apache/phoenix/jdbc/PhoenixConnection.java | 4 +- .../phoenix/query/ConnectionQueryServices.java | 2 - .../query/ConnectionQueryServicesImpl.java | 62 +-- .../query/ConnectionlessQueryServicesImpl.java | 15 +- .../query/DelegateConnectionQueryServices.java | 6 - .../apache/phoenix/schema/MetaDataClient.java | 4 +- .../org/apache/phoenix/schema/PTableImpl.java | 6 +- .../transaction/OmidTransactionContext.java | 174 +++++++ .../transaction/OmidTransactionTable.java | 339 ++++++++++++ .../transaction/PhoenixTransactionContext.java | 191 +++++++ .../transaction/PhoenixTransactionalTable.java | 149 ++++++ .../transaction/TephraTransactionContext.java | 516 +++++++++++++++++++ .../transaction/TephraTransactionTable.java | 330 ++++++++++++ .../phoenix/transaction/TransactionFactory.java | 143 +++++ .../java/org/apache/phoenix/util/IndexUtil.java | 4 +- .../org/apache/phoenix/util/PhoenixRuntime.java | 4 +- .../apache/phoenix/util/TransactionUtil.java | 31 +- .../java/org/apache/phoenix/query/BaseTest.java | 68 +-- 35 files changed, 2133 insertions(+), 522 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java index cd0c371..0747541 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java @@ -271,11 +271,11 @@ public class PartialCommitIT extends BaseOwnClusterIT { return new PhoenixConnection(phxCon, null) { @Override protected MutationState newMutationState(int maxSize, int maxSizeBytes) { - return new MutationState(maxSize, maxSizeBytes, this, mutations, null, null); + return new MutationState(maxSize, maxSizeBytes, this, mutations, false, null); }; }; } - + public static class FailingRegionObserver extends SimpleRegionObserver { @Override public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit, http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java index 5a990cf..06eac6c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java @@ -42,12 +42,11 @@ import org.apache.phoenix.end2end.ParallelStatsDisabledIT; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.transaction.PhoenixTransactionContext; +import org.apache.phoenix.transaction.PhoenixTransactionalTable; +import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.TestUtil; -import org.apache.tephra.TransactionContext; -import org.apache.tephra.TransactionSystemClient; -import org.apache.tephra.TxConstants; -import org.apache.tephra.hbase.TransactionAwareHTable; import org.junit.Test; /** @@ -213,8 +212,6 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT { String fullTableName = generateUniqueName(); PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); - TransactionSystemClient txServiceClient = pconn.getQueryServices().getTransactionSystemClient(); - Statement stmt = conn.createStatement(); stmt.execute("CREATE TABLE " + fullTableName + "(K VARCHAR PRIMARY KEY, V1 VARCHAR, V2 VARCHAR) TRANSACTIONAL=true"); HTableInterface htable = pconn.getQueryServices().getTable(Bytes.toBytes(fullTableName)); @@ -227,16 +224,18 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT { assertEquals(1,rs.getInt(1)); } - // Use HBase level Tephra APIs to start a new transaction - TransactionAwareHTable txAware = new TransactionAwareHTable(htable, TxConstants.ConflictDetection.ROW); - TransactionContext txContext = new TransactionContext(txServiceClient, txAware); - txContext.start(); - + PhoenixTransactionContext txContext = + TransactionFactory.getTransactionFactory().getTransactionContext(pconn); + PhoenixTransactionalTable txTable = + TransactionFactory.getTransactionFactory().getTransactionalTable(txContext, htable); + + txContext.begin(); + // Use HBase APIs to add a new row Put put = new Put(Bytes.toBytes("z")); put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES); put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, Bytes.toBytes("V1"), Bytes.toBytes("b")); - txAware.put(put); + txTable.put(put); // Use Phoenix APIs to add new row (sharing the transaction context) pconn.setTransactionContext(txContext); @@ -258,8 +257,8 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT { assertTrue(rs.next()); assertEquals(3,rs.getInt(1)); - // Use Tephra APIs directly to finish (i.e. commit) the transaction - txContext.finish(); + // Use TM APIs directly to finish (i.e. commit) the transaction + txContext.commit(); // Confirm that attempt to commit row with conflict fails try { @@ -279,14 +278,18 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT { } // Repeat the same as above, but this time abort the transaction - txContext = new TransactionContext(txServiceClient, txAware); - txContext.start(); + txContext = + TransactionFactory.getTransactionFactory().getTransactionContext(pconn); + txTable = + TransactionFactory.getTransactionFactory().getTransactionalTable(txContext, htable); + + txContext.begin(); // Use HBase APIs to add a new row put = new Put(Bytes.toBytes("j")); put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES); put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, Bytes.toBytes("V1"), Bytes.toBytes("e")); - txAware.put(put); + txTable.put(put); // Use Phoenix APIs to add new row (sharing the transaction context) pconn.setTransactionContext(txContext); @@ -302,7 +305,7 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT { assertTrue(rs.next()); assertEquals(4,rs.getInt(1)); - // Use Tephra APIs directly to abort (i.e. rollback) the transaction + // Use TM APIs directly to abort (i.e. rollback) the transaction txContext.abort(); rs = conn.createStatement().executeQuery("select count(*) from " + fullTableName); @@ -325,4 +328,4 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT { assertTrue(result.isEmpty()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java index a5c1cf4..fecfd9a 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java @@ -53,10 +53,10 @@ import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.types.PInteger; +import org.apache.phoenix.transaction.PhoenixTransactionContext; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.TestUtil; -import org.apache.tephra.TxConstants; import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; @@ -391,7 +391,10 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT { admin.createTable(desc); ddl = "CREATE TABLE " + t2 + " (k varchar primary key) transactional=true"; conn.createStatement().execute(ddl); - assertEquals(Boolean.TRUE.toString(), admin.getTableDescriptor(TableName.valueOf(t2)).getValue(TxConstants.READ_NON_TX_DATA)); + + HTableDescriptor htableDescriptor = admin.getTableDescriptor(TableName.valueOf(t2)); + String str = htableDescriptor.getValue(PhoenixTransactionContext.READ_NON_TX_DATA); + assertEquals(Boolean.TRUE.toString(), str); // Should be ok, as HBase metadata should match existing metadata. ddl = "CREATE TABLE IF NOT EXISTS " + t1 + " (k varchar primary key)"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java index f37d09b..c76e19c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java @@ -43,10 +43,10 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PTableKey; +import org.apache.phoenix.transaction.PhoenixTransactionContext; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.StringUtil; import org.apache.phoenix.util.TestUtil; -import org.apache.tephra.TxConstants; import org.junit.Test; public class TransactionIT extends ParallelStatsDisabledIT { @@ -147,21 +147,24 @@ public class TransactionIT extends ParallelStatsDisabledIT { for (HColumnDescriptor colDesc : desc.getFamilies()) { assertEquals(QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL, colDesc.getMaxVersions()); assertEquals(1000, colDesc.getTimeToLive()); - assertEquals(1000, Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL))); + String propertyTTL = colDesc.getValue(PhoenixTransactionContext.PROPERTY_TTL); + assertEquals(1000, Integer.parseInt(propertyTTL)); } desc = conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes("IDX1")); for (HColumnDescriptor colDesc : desc.getFamilies()) { assertEquals(QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL, colDesc.getMaxVersions()); assertEquals(1000, colDesc.getTimeToLive()); - assertEquals(1000, Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL))); + String propertyTTL = colDesc.getValue(PhoenixTransactionContext.PROPERTY_TTL); + assertEquals(1000, Integer.parseInt(propertyTTL)); } desc = conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes("IDX2")); for (HColumnDescriptor colDesc : desc.getFamilies()) { assertEquals(QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL, colDesc.getMaxVersions()); assertEquals(1000, colDesc.getTimeToLive()); - assertEquals(1000, Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL))); + String propertyTTL = colDesc.getValue(PhoenixTransactionContext.PROPERTY_TTL); + assertEquals(1000, Integer.parseInt(propertyTTL)); } conn.createStatement().execute("CREATE TABLE " + nonTxTableName + "2(k INTEGER PRIMARY KEY, a.v VARCHAR, b.v VARCHAR, c.v VARCHAR)"); @@ -170,14 +173,15 @@ public class TransactionIT extends ParallelStatsDisabledIT { for (HColumnDescriptor colDesc : desc.getFamilies()) { assertEquals(10, colDesc.getMaxVersions()); assertEquals(HColumnDescriptor.DEFAULT_TTL, colDesc.getTimeToLive()); - assertEquals(null, colDesc.getValue(TxConstants.PROPERTY_TTL)); + assertEquals(null, colDesc.getValue(PhoenixTransactionContext.PROPERTY_TTL)); } conn.createStatement().execute("ALTER TABLE " + nonTxTableName + "2 SET TTL=1000"); desc = conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes( nonTxTableName + "2")); for (HColumnDescriptor colDesc : desc.getFamilies()) { assertEquals(10, colDesc.getMaxVersions()); assertEquals(1000, colDesc.getTimeToLive()); - assertEquals(1000, Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL))); + String propertyTTL = colDesc.getValue(PhoenixTransactionContext.PROPERTY_TTL); + assertEquals(1000, Integer.parseInt(propertyTTL)); } conn.createStatement().execute("CREATE TABLE " + nonTxTableName + "3(k INTEGER PRIMARY KEY, a.v VARCHAR, b.v VARCHAR, c.v VARCHAR)"); @@ -207,7 +211,8 @@ public class TransactionIT extends ParallelStatsDisabledIT { for (HColumnDescriptor colDesc : desc.getFamilies()) { assertEquals(QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL, colDesc.getMaxVersions()); assertEquals(HColumnDescriptor.DEFAULT_TTL, colDesc.getTimeToLive()); - assertEquals(1000, Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL))); + String propertyTTL = colDesc.getValue(PhoenixTransactionContext.PROPERTY_TTL); + assertEquals(1000, Integer.parseInt(propertyTTL)); } } @@ -291,4 +296,4 @@ public class TransactionIT extends ParallelStatsDisabledIT { conn.close(); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java index cb3b4b3..989a97e 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java @@ -37,9 +37,9 @@ import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.PTableImpl; +import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.SchemaUtil; -import org.apache.tephra.Transaction.VisibilityLevel; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -53,21 +53,21 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { public TxCheckpointIT(boolean localIndex, boolean mutable, boolean columnEncoded) { StringBuilder optionBuilder = new StringBuilder(); - this.localIndex = localIndex; - if (!columnEncoded) { - if (optionBuilder.length()!=0) - optionBuilder.append(","); - optionBuilder.append("COLUMN_ENCODED_BYTES=0"); - } - if (!mutable) { - if (optionBuilder.length()!=0) - optionBuilder.append(","); - optionBuilder.append("IMMUTABLE_ROWS=true"); - if (!columnEncoded) { - optionBuilder.append(",IMMUTABLE_STORAGE_SCHEME="+PTableImpl.ImmutableStorageScheme.ONE_CELL_PER_COLUMN); - } - } - this.tableDDLOptions = optionBuilder.toString(); + this.localIndex = localIndex; + if (!columnEncoded) { + if (optionBuilder.length()!=0) + optionBuilder.append(","); + optionBuilder.append("COLUMN_ENCODED_BYTES=0"); + } + if (!mutable) { + if (optionBuilder.length()!=0) + optionBuilder.append(","); + optionBuilder.append("IMMUTABLE_ROWS=true"); + if (!columnEncoded) { + optionBuilder.append(",IMMUTABLE_STORAGE_SCHEME="+PTableImpl.ImmutableStorageScheme.ONE_CELL_PER_COLUMN); + } + } + this.tableDDLOptions = optionBuilder.toString(); } private static Connection getConnection() throws SQLException { @@ -83,8 +83,8 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { @Parameters(name="TxCheckpointIT_localIndex={0},mutable={1},columnEncoded={2}") // name is used by failsafe as file name in reports public static Collection<Boolean[]> data() { return Arrays.asList(new Boolean[][] { - { false, false, false }, { false, false, true }, { false, true, false }, { false, true, true }, - { true, false, false }, { true, false, true }, { true, true, false }, { true, true, true } + { false, false, false }, { false, false, true }, { false, true, false }, { false, true, true }, + { true, false, false }, { true, false, true }, { true, true, false }, { true, true, true } }); } @@ -101,7 +101,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { Connection conn = getConnection(props); conn.setAutoCommit(true); conn.createStatement().execute("CREATE SEQUENCE "+seqName); - conn.createStatement().execute("CREATE TABLE " + fullTableName + "(pk INTEGER PRIMARY KEY, val INTEGER)" + tableDDLOptions); + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(pk INTEGER PRIMARY KEY, val INTEGER)"+tableDDLOptions); conn.createStatement().execute("CREATE "+(localIndex? "LOCAL " : "")+"INDEX " + indexName + " ON " + fullTableName + "(val)"); conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES (NEXT VALUE FOR " + seqName + ",1)"); @@ -132,11 +132,12 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { } private void testRollbackOfUncommittedDelete(String indexDDL, String fullTableName) throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); Connection conn = getConnection(); conn.setAutoCommit(false); try { Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions); + stmt.execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"+tableDDLOptions); stmt.execute(indexDDL); stmt.executeUpdate("upsert into " + fullTableName + " values('x1', 'y1', 'a1')"); @@ -220,11 +221,13 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { String tableName = "TBL_" + generateUniqueName(); String indexName = "IDX_" + generateUniqueName(); String fullTableName = SchemaUtil.getTableName(tableName, tableName); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); try (Connection conn = getConnection()) { conn.setAutoCommit(false); Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE " + fullTableName + "(ID BIGINT NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions); + stmt.execute("CREATE TABLE " + fullTableName + "(ID BIGINT NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + + tableDDLOptions); stmt.execute("CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE(v2)"); @@ -266,7 +269,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { long wp = state.getWritePointer(); conn.createStatement().execute( "upsert into " + fullTableName + " select max(id)+1, 'a4', 'b4' from " + fullTableName + ""); - assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, + assertEquals(PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel()); assertEquals(wp, state.getWritePointer()); // Make sure write ptr // didn't move @@ -278,7 +281,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { conn.createStatement().execute( "upsert into " + fullTableName + " select max(id)+1, 'a5', 'b5' from " + fullTableName + ""); - assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, + assertEquals(PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel()); assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr // moves @@ -291,7 +294,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { conn.createStatement().execute( "upsert into " + fullTableName + " select max(id)+1, 'a6', 'b6' from " + fullTableName + ""); - assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, + assertEquals(PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel()); assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr // moves @@ -313,8 +316,10 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { try (Connection conn = getConnection()) { conn.setAutoCommit(false); Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE " + fullTableName + "1(ID1 BIGINT NOT NULL PRIMARY KEY, FK1A INTEGER, FK1B INTEGER)" + tableDDLOptions); - stmt.execute("CREATE TABLE " + fullTableName + "2(ID2 BIGINT NOT NULL PRIMARY KEY, FK2 INTEGER)" + tableDDLOptions); + stmt.execute("CREATE TABLE " + fullTableName + "1(ID1 BIGINT NOT NULL PRIMARY KEY, FK1A INTEGER, FK1B INTEGER)" + + tableDDLOptions); + stmt.execute("CREATE TABLE " + fullTableName + "2(ID2 BIGINT NOT NULL PRIMARY KEY, FK2 INTEGER)" + + tableDDLOptions); stmt.execute("CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + fullTableName + "1 (FK1B)"); @@ -328,7 +333,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { state.startTransaction(); long wp = state.getWritePointer(); conn.createStatement().execute("delete from " + fullTableName + "1 where id1=fk1b AND fk1b=id1"); - assertEquals(VisibilityLevel.SNAPSHOT, state.getVisibilityLevel()); + assertEquals(PhoenixVisibilityLevel.SNAPSHOT, state.getVisibilityLevel()); assertEquals(wp, state.getWritePointer()); // Make sure write ptr didn't move rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ id1 from " + fullTableName + "1"); @@ -346,7 +351,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { assertFalse(rs.next()); conn.createStatement().execute("delete from " + fullTableName + "1 where id1 in (select fk1a from " + fullTableName + "1 join " + fullTableName + "2 on (fk2=id1))"); - assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel()); + assertEquals(PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel()); assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr moved rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ id1 from " + fullTableName + "1"); @@ -363,7 +368,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { stmt.executeUpdate("upsert into " + fullTableName + "2 values (2, 4)"); conn.createStatement().execute("delete from " + fullTableName + "1 where id1 in (select fk1a from " + fullTableName + "1 join " + fullTableName + "2 on (fk2=id1))"); - assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel()); + assertEquals(PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel()); assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr moved rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ id1 from " + fullTableName + "1"); http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java index d22993c..16207c8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java @@ -23,9 +23,8 @@ import java.io.IOException; import java.util.Collections; import java.util.List; -import org.apache.tephra.Transaction; - import org.apache.phoenix.index.IndexMaintainer; +import org.apache.phoenix.transaction.PhoenixTransactionContext; public interface IndexMetaDataCache extends Closeable { public static final IndexMetaDataCache EMPTY_INDEX_META_DATA_CACHE = new IndexMetaDataCache() { @@ -40,11 +39,11 @@ public interface IndexMetaDataCache extends Closeable { } @Override - public Transaction getTransaction() { + public PhoenixTransactionContext getTransactionContext() { return null; } }; public List<IndexMaintainer> getIndexMaintainers(); - public Transaction getTransaction(); + public PhoenixTransactionContext getTransactionContext(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index 9dc1e47413..5777e2f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -320,7 +320,6 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { final Region dataRegion, final IndexMaintainer indexMaintainer, final byte[][] viewConstants, final TupleProjector projector, final ImmutableBytesWritable ptr, final boolean useQualiferAsListIndex) { - RegionScannerFactory regionScannerFactory = new NonAggregateRegionScannerFactory(c.getEnvironment(), useNewValueColumnQualifier, encodingScheme); http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java index 8693681..37fa2ab 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java @@ -17,12 +17,12 @@ */ package org.apache.phoenix.coprocessor; -import org.apache.tephra.hbase.coprocessor.TransactionProcessor; +import org.apache.phoenix.transaction.TransactionFactory; public class PhoenixTransactionalProcessor extends DelegateRegionObserver { public PhoenixTransactionalProcessor() { - super(new TransactionProcessor()); + super(TransactionFactory.getTransactionFactory().getTransactionContext().getCoProcessor()); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java index b006ef6..dda1a6d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.expression.OrderByExpression; + import org.apache.phoenix.iterate.NonAggregateRegionScannerFactory; /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 229b51c..48235f2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -108,6 +108,7 @@ import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PDouble; import org.apache.phoenix.schema.types.PFloat; import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.transaction.PhoenixTransactionContext; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.ExpressionUtil; @@ -118,7 +119,6 @@ import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.StringUtil; import org.apache.phoenix.util.TimeKeeper; -import org.apache.tephra.TxConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -609,7 +609,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } mutations.add(delete); // force tephra to ignore this deletes - delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); + delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); } else if (isUpsert) { Arrays.fill(values, null); int bucketNumOffset = 0; @@ -676,7 +676,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver results.get(0).getRowLength()); delete.deleteColumns(deleteCF, deleteCQ, ts); // force tephra to ignore this deletes - delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); + delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); mutations.add(delete); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index cc37402..e17c0c0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -85,6 +85,10 @@ import org.apache.phoenix.schema.ValueSchema.Field; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.trace.util.Tracing; +import org.apache.phoenix.transaction.PhoenixTransactionContext; +import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel; +import org.apache.phoenix.transaction.PhoenixTransactionalTable; +import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.KeyValueUtil; @@ -95,17 +99,6 @@ import org.apache.phoenix.util.SQLCloseables; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.TransactionUtil; -import org.apache.tephra.Transaction; -import org.apache.tephra.Transaction.VisibilityLevel; -import org.apache.tephra.TransactionAware; -import org.apache.tephra.TransactionCodec; -import org.apache.tephra.TransactionConflictException; -import org.apache.tephra.TransactionContext; -import org.apache.tephra.TransactionFailureException; -import org.apache.tephra.TransactionSystemClient; -import org.apache.tephra.hbase.TransactionAwareHTable; -import org.apache.tephra.visibility.FenceWait; -import org.apache.tephra.visibility.VisibilityFence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -122,10 +115,9 @@ import com.google.common.collect.Sets; */ public class MutationState implements SQLCloseable { private static final Logger logger = LoggerFactory.getLogger(MutationState.class); - private static final TransactionCodec CODEC = new TransactionCodec(); private static final int[] EMPTY_STATEMENT_INDEX_ARRAY = new int[0]; private static final int MAX_COMMIT_RETRIES = 3; - + private final PhoenixConnection connection; private final long maxSize; private final long maxSizeBytes; @@ -133,48 +125,47 @@ public class MutationState implements SQLCloseable { private final long batchSizeBytes; private long batchCount = 0L; private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations; - private final List<TransactionAware> txAwares; - private final TransactionContext txContext; private final Set<String> uncommittedPhysicalNames = Sets.newHashSetWithExpectedSize(10); - - private Transaction tx; + private long sizeOffset; private int numRows = 0; private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY; private boolean isExternalTxContext = false; private Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = Collections.emptyMap(); - + + final PhoenixTransactionContext phoenixTransactionContext; + private final MutationMetricQueue mutationMetricQueue; private ReadMetricQueue readMetricQueue; public MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection) { - this(maxSize, maxSizeBytes, connection, null, null); + this(maxSize,maxSizeBytes, connection, false, null); } - - public MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, TransactionContext txContext) { - this(maxSize, maxSizeBytes, connection, null, txContext); + + public MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, PhoenixTransactionContext txContext) { + this(maxSize, maxSizeBytes, connection, false, txContext); } - + public MutationState(MutationState mutationState) { - this(mutationState.maxSize, mutationState.maxSizeBytes, mutationState.connection, mutationState.getTransaction(), null); + this(mutationState.maxSize, mutationState.maxSizeBytes, mutationState.connection, true, mutationState.getPhoenixTransactionContext()); } - + public MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, long sizeOffset) { - this(maxSize, maxSizeBytes, connection, null, null, sizeOffset); + this(maxSize, maxSizeBytes, connection, false, null, sizeOffset); } - - private MutationState(long maxSize, long maxSizeBytes,PhoenixConnection connection, Transaction tx, TransactionContext txContext) { - this(maxSize, maxSizeBytes, connection, tx, txContext, 0); + + private MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, boolean subTask, PhoenixTransactionContext txContext) { + this(maxSize, maxSizeBytes, connection, subTask, txContext, 0); } - - private MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, Transaction tx, TransactionContext txContext, long sizeOffset) { - this(maxSize, maxSizeBytes, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(5), tx, txContext); + + private MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, boolean subTask, PhoenixTransactionContext txContext, long sizeOffset) { + this(maxSize, maxSizeBytes, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(5), subTask, txContext); this.sizeOffset = sizeOffset; } - - MutationState(long maxSize, long maxSizeBytes, - PhoenixConnection connection, - Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations, Transaction tx, TransactionContext txContext) { + + MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, + Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations, + boolean subTask, PhoenixTransactionContext txContext) { this.maxSize = maxSize; this.maxSizeBytes = maxSizeBytes; this.connection = connection; @@ -184,115 +175,65 @@ public class MutationState implements SQLCloseable { boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled(); this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue() : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE; - this.tx = tx; - if (tx == null) { - this.txAwares = Collections.emptyList(); + if (subTask == false) { if (txContext == null) { - TransactionSystemClient txServiceClient = this.connection - .getQueryServices().getTransactionSystemClient(); - this.txContext = new TransactionContext(txServiceClient); + phoenixTransactionContext = TransactionFactory.getTransactionFactory().getTransactionContext(connection); } else { isExternalTxContext = true; - this.txContext = txContext; + phoenixTransactionContext = TransactionFactory.getTransactionFactory().getTransactionContext(txContext, connection, subTask); } } else { // this code path is only used while running child scans, we can't pass the txContext to child scans // as it is not thread safe, so we use the tx member variable - this.txAwares = Lists.newArrayList(); - this.txContext = null; + phoenixTransactionContext = TransactionFactory.getTransactionFactory().getTransactionContext(txContext, connection, subTask); } } public MutationState(TableRef table, Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long maxSize, long maxSizeBytes, PhoenixConnection connection) throws SQLException { - this(maxSize, maxSizeBytes, connection, null, null, sizeOffset); + this(maxSize, maxSizeBytes, connection, false, null, sizeOffset); this.mutations.put(table, mutations); this.numRows = mutations.size(); - this.tx = connection.getMutationState().getTransaction(); throwIfTooBig(); } - + public long getMaxSize() { return maxSize; } - + public long getMaxSizeBytes() { return maxSizeBytes; } - + + public PhoenixTransactionContext getPhoenixTransactionContext() { + return phoenixTransactionContext; + } + /** * Commit a write fence when creating an index so that we can detect * when a data table transaction is started before the create index * but completes after it. In this case, we need to rerun the data * table transaction after the index creation so that the index rows - * are generated. See {@link #addDMLFence(PTable)} and TEPHRA-157 + * are generated. See TEPHRA-157 * for more information. * @param dataTable the data table upon which an index is being added * @throws SQLException */ public void commitDDLFence(PTable dataTable) throws SQLException { if (dataTable.isTransactional()) { - byte[] key = dataTable.getName().getBytes(); - boolean success = false; try { - FenceWait fenceWait = VisibilityFence.prepareWait(key, connection.getQueryServices().getTransactionSystemClient()); - fenceWait.await(10000, TimeUnit.MILLISECONDS); - success = true; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build().buildException(); - } catch (TimeoutException | TransactionFailureException e) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_UNABLE_TO_GET_WRITE_FENCE) - .setSchemaName(dataTable.getSchemaName().getString()) - .setTableName(dataTable.getTableName().getString()) - .build().buildException(); + phoenixTransactionContext.commitDDLFence(dataTable, logger); } finally { // The client expects a transaction to be in progress on the txContext while the // VisibilityFence.prepareWait() starts a new tx and finishes/aborts it. After it's // finished, we start a new one here. // TODO: seems like an autonomous tx capability in Tephra would be useful here. - try { - txContext.start(); - if (logger.isInfoEnabled() && success) logger.info("Added write fence at ~" + getTransaction().getReadPointer()); - } catch (TransactionFailureException e) { - throw TransactionUtil.getTransactionFailureException(e); - } - } - } - } - - /** - * Add an entry to the change set representing the DML operation that is starting. - * These entries will not conflict with each other, but they will conflict with a - * DDL operation of creating an index. See {@link #addDMLFence(PTable)} and TEPHRA-157 - * for more information. - * @param table the table which is doing DML - * @throws SQLException - */ - private void addDMLFence(PTable table) throws SQLException { - if (table.getType() == PTableType.INDEX || !table.isTransactional()) { - return; - } - byte[] logicalKey = table.getName().getBytes(); - TransactionAware logicalTxAware = VisibilityFence.create(logicalKey); - if (this.txContext == null) { - this.txAwares.add(logicalTxAware); - } else { - this.txContext.addTransactionAware(logicalTxAware); - } - byte[] physicalKey = table.getPhysicalName().getBytes(); - if (Bytes.compareTo(physicalKey, logicalKey) != 0) { - TransactionAware physicalTxAware = VisibilityFence.create(physicalKey); - if (this.txContext == null) { - this.txAwares.add(physicalTxAware); - } else { - this.txContext.addTransactionAware(physicalTxAware); + phoenixTransactionContext.begin(); } } } public boolean checkpointIfNeccessary(MutationPlan plan) throws SQLException { - Transaction currentTx = getTransaction(); - if (getTransaction() == null || plan.getTargetRef() == null || plan.getTargetRef().getTable() == null || !plan.getTargetRef().getTable().isTransactional()) { + if (! phoenixTransactionContext.isTransactionRunning() || plan.getTargetRef() == null || plan.getTargetRef().getTable() == null || !plan.getTargetRef().getTable().isTransactional()) { return false; } Set<TableRef> sources = plan.getSourceRefs(); @@ -332,118 +273,78 @@ public class MutationState implements SQLCloseable { break; } } + + phoenixTransactionContext.checkpoint(hasUncommittedData); + if (hasUncommittedData) { - try { - if (txContext == null) { - currentTx = tx = connection.getQueryServices().getTransactionSystemClient().checkpoint(currentTx); - } else { - txContext.checkpoint(); - currentTx = tx = txContext.getCurrentTransaction(); - } - // Since we've checkpointed, we can clear out uncommitted set, since a statement run afterwards - // should see all this data. - uncommittedPhysicalNames.clear(); - } catch (TransactionFailureException e) { - throw new SQLException(e); - } + uncommittedPhysicalNames.clear(); } - // Since we're querying our own table while mutating it, we must exclude - // see our current mutations, otherwise we can get erroneous results (for DELETE) - // or get into an infinite loop (for UPSERT SELECT). - currentTx.setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT); return true; } return false; } - - private void addTransactionParticipant(TransactionAware txAware) throws SQLException { - if (txContext == null) { - txAwares.add(txAware); - assert(tx != null); - txAware.startTx(tx); - } else { - txContext.addTransactionAware(txAware); - } - } - + // Though MutationState is not thread safe in general, this method should be because it may // be called by TableResultIterator in a multi-threaded manner. Since we do not want to expose // the Transaction outside of MutationState, this seems reasonable, as the member variables // would not change as these threads are running. public HTableInterface getHTable(PTable table) throws SQLException { HTableInterface htable = this.getConnection().getQueryServices().getTable(table.getPhysicalName().getBytes()); - Transaction currentTx; - if (table.isTransactional() && (currentTx=getTransaction()) != null) { - TransactionAwareHTable txAware = TransactionUtil.getTransactionAwareHTable(htable, table.isImmutableRows()); + if (table.isTransactional() && phoenixTransactionContext.isTransactionRunning()) { + PhoenixTransactionalTable phoenixTransactionTable = TransactionUtil.getPhoenixTransactionTable(phoenixTransactionContext, htable, table); // Using cloned mutationState as we may have started a new transaction already // if auto commit is true and we need to use the original one here. - txAware.startTx(currentTx); - htable = txAware; + htable = phoenixTransactionTable; } return htable; } - + public PhoenixConnection getConnection() { return connection; } - - // Kept private as the Transaction may change when check pointed. Keeping it private ensures - // no one holds on to a stale copy. - private Transaction getTransaction() { - return tx != null ? tx : txContext != null ? txContext.getCurrentTransaction() : null; - } - + public boolean isTransactionStarted() { - return getTransaction() != null; + return phoenixTransactionContext.isTransactionRunning(); } - + public long getInitialWritePointer() { - Transaction tx = getTransaction(); - return tx == null ? HConstants.LATEST_TIMESTAMP : tx.getTransactionId(); // First write pointer - won't change with checkpointing + return phoenixTransactionContext.getTransactionId(); // First write pointer - won't change with checkpointing } - + // For testing public long getWritePointer() { - Transaction tx = getTransaction(); - return tx == null ? HConstants.LATEST_TIMESTAMP : tx.getWritePointer(); + return phoenixTransactionContext.getWritePointer(); } - + // For testing - public VisibilityLevel getVisibilityLevel() { - Transaction tx = getTransaction(); - return tx == null ? null : tx.getVisibilityLevel(); + public PhoenixVisibilityLevel getVisibilityLevel() { + return phoenixTransactionContext.getVisibilityLevel(); } - + public boolean startTransaction() throws SQLException { - if (txContext == null) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build().buildException(); - } - if (connection.getSCN() != null) { throw new SQLExceptionInfo.Builder( SQLExceptionCode.CANNOT_START_TRANSACTION_WITH_SCN_SET) .build().buildException(); } - - try { - if (!isTransactionStarted()) { - // Clear any transactional state in case transaction was ended outside - // of Phoenix so we don't carry the old transaction state forward. We - // cannot call reset() here due to the case of having mutations and - // then transitioning from non transactional to transactional (which - // would end up clearing our uncommitted state). - resetTransactionalState(); - txContext.start(); - return true; - } - } catch (TransactionFailureException e) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED).setRootCause(e).build().buildException(); + + if (!isTransactionStarted()) { + // Clear any transactional state in case transaction was ended outside + // of Phoenix so we don't carry the old transaction state forward. We + // cannot call reset() here due to the case of having mutations and + // then transitioning from non transactional to transactional (which + // would end up clearing our uncommitted state). + resetTransactionalState(); + phoenixTransactionContext.begin(); + return true; } + return false; } + public static MutationState emptyMutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection) { - MutationState state = new MutationState(maxSize, maxSizeBytes, connection, Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), null, null); + MutationState state = new MutationState(maxSize, maxSizeBytes, connection, Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), false, null); state.sizeOffset = 0; return state; } @@ -529,13 +430,9 @@ public class MutationState implements SQLCloseable { if (this == newMutationState) { // Doesn't make sense return; } - if (txContext != null) { - for (TransactionAware txAware : newMutationState.txAwares) { - txContext.addTransactionAware(txAware); - } - } else { - txAwares.addAll(newMutationState.txAwares); - } + + phoenixTransactionContext.join(newMutationState.getPhoenixTransactionContext()); + this.sizeOffset += newMutationState.sizeOffset; joinMutationState(newMutationState.mutations, this.mutations); if (!newMutationState.txMutations.isEmpty()) { @@ -552,7 +449,7 @@ public class MutationState implements SQLCloseable { } throwIfTooBig(); } - + private static ImmutableBytesPtr getNewRowKeyWithRowTimestamp(ImmutableBytesPtr ptr, long rowTimestamp, PTable table) { RowKeySchema schema = table.getRowKeySchema(); @@ -1084,26 +981,16 @@ public class MutationState implements SQLCloseable { if (table.isTransactional()) { // Track tables to which we've sent uncommitted data txTableRefs.add(origTableRef); - addDMLFence(table); uncommittedPhysicalNames.add(table.getPhysicalName().getString()); - + // If we have indexes, wrap the HTable in a delegate HTable that // will attach the necessary index meta data in the event of a // rollback if (!table.getIndexes().isEmpty()) { hTable = new MetaDataAwareHTable(hTable, origTableRef); } - TransactionAwareHTable txnAware = TransactionUtil.getTransactionAwareHTable(hTable, table.isImmutableRows()); - // Don't add immutable indexes (those are the only ones that would participate - // during a commit), as we don't need conflict detection for these. - if (tableInfo.isDataTable()) { - // Even for immutable, we need to do this so that an abort has the state - // necessary to generate the rows to delete. - addTransactionParticipant(txnAware); - } else { - txnAware.startTx(getTransaction()); - } - hTable = txnAware; + + hTable = TransactionUtil.getPhoenixTransactionTable(phoenixTransactionContext, hTable, table); } long numMutations = mutationList.size(); @@ -1209,15 +1096,11 @@ public class MutationState implements SQLCloseable { } public byte[] encodeTransaction() throws SQLException { - try { - return CODEC.encode(getTransaction()); - } catch (IOException e) { - throw new SQLException(e); - } + return phoenixTransactionContext.encodeTransaction(); } - - public static Transaction decodeTransaction(byte[] txnBytes) throws IOException { - return (txnBytes == null || txnBytes.length==0) ? null : CODEC.decode(txnBytes); + + public static PhoenixTransactionContext decodeTransaction(byte[] txnBytes) throws IOException { + return TransactionFactory.getTransactionFactory().getTransactionContext(txnBytes); } private ServerCache setMetaDataOnMutations(TableRef tableRef, List<? extends Mutation> mutations, @@ -1294,29 +1177,22 @@ public class MutationState implements SQLCloseable { this.mutations.clear(); resetTransactionalState(); } - + private void resetTransactionalState() { - tx = null; - txAwares.clear(); + phoenixTransactionContext.reset(); txMutations = Collections.emptyMap(); uncommittedPhysicalNames.clear(); uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY; } - + public void rollback() throws SQLException { try { - if (txContext != null && isTransactionStarted()) { - try { - txContext.abort(); - } catch (TransactionFailureException e) { - throw TransactionUtil.getTransactionFailureException(e); - } - } + phoenixTransactionContext.abort(); } finally { resetState(); } } - + public void commit() throws SQLException { Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = Collections.emptyMap(); int retryCount = 0; @@ -1332,38 +1208,32 @@ public class MutationState implements SQLCloseable { sqlE = e; } finally { try { - if (txContext != null && isTransactionStarted()) { - TransactionFailureException txFailure = null; - boolean finishSuccessful=false; - try { - if (sendSuccessful) { - txContext.finish(); - finishSuccessful = true; - } - } catch (TransactionFailureException e) { - if (logger.isInfoEnabled()) logger.info(e.getClass().getName() + " at timestamp " + getInitialWritePointer() + " with retry count of " + retryCount); - retryCommit = (e instanceof TransactionConflictException && retryCount < MAX_COMMIT_RETRIES); - txFailure = e; - SQLException nextE = TransactionUtil.getTransactionFailureException(e); - if (sqlE == null) { - sqlE = nextE; - } else { - sqlE.setNextException(nextE); - } - } finally { - // If send fails or finish fails, abort the tx - if (!finishSuccessful) { - try { - txContext.abort(txFailure); - if (logger.isInfoEnabled()) logger.info("Abort successful"); - } catch (TransactionFailureException e) { - if (logger.isInfoEnabled()) logger.info("Abort failed with " + e); - SQLException nextE = TransactionUtil.getTransactionFailureException(e); - if (sqlE == null) { - sqlE = nextE; - } else { - sqlE.setNextException(nextE); - } + boolean finishSuccessful=false; + try { + if (sendSuccessful) { + phoenixTransactionContext.commit(); + finishSuccessful = true; + } + } catch (SQLException e) { + if (logger.isInfoEnabled()) logger.info(e.getClass().getName() + " at timestamp " + getInitialWritePointer() + " with retry count of " + retryCount); + retryCommit = (e.getErrorCode() == SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode() && retryCount < MAX_COMMIT_RETRIES); + if (sqlE == null) { + sqlE = e; + } else { + sqlE.setNextException(e); + } + } finally { + // If send fails or finish fails, abort the tx + if (!finishSuccessful) { + try { + phoenixTransactionContext.abort(); + if (logger.isInfoEnabled()) logger.info("Abort successful"); + } catch (SQLException e) { + if (logger.isInfoEnabled()) logger.info("Abort failed with " + e); + if (sqlE == null) { + sqlE = e; + } else { + sqlE.setNextException(e); } } } @@ -1376,10 +1246,6 @@ public class MutationState implements SQLCloseable { startTransaction(); // Add back read fences Set<TableRef> txTableRefs = txMutations.keySet(); - for (TableRef tableRef : txTableRefs) { - PTable dataTable = tableRef.getTable(); - addDMLFence(dataTable); - } try { // Only retry if an index was added retryCommit = shouldResubmitTransaction(txTableRefs); @@ -1465,12 +1331,13 @@ public class MutationState implements SQLCloseable { * @throws SQLException */ public boolean sendUncommitted(Iterator<TableRef> tableRefs) throws SQLException { - Transaction currentTx = getTransaction(); - if (currentTx != null) { + + if (phoenixTransactionContext.isTransactionRunning()) { // Initialize visibility so that transactions see their own writes. // The checkpoint() method will set it to not see writes if necessary. - currentTx.setVisibility(VisibilityLevel.SNAPSHOT); + phoenixTransactionContext.setVisibilityLevel(PhoenixVisibilityLevel.SNAPSHOT); } + Iterator<TableRef> filteredTableRefs = Iterators.filter(tableRefs, new Predicate<TableRef>(){ @Override public boolean apply(TableRef tableRef) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index be22334..a7ea99a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -100,6 +100,7 @@ import org.apache.phoenix.schema.ValueSchema.Field; import org.apache.phoenix.schema.tuple.BaseTuple; import org.apache.phoenix.schema.tuple.ValueGetterTuple; import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.BitSet; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.EncodedColumnsUtil; @@ -108,7 +109,6 @@ import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TrustedByteArrayOutputStream; -import org.apache.tephra.TxConstants; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; @@ -1046,7 +1046,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } else if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode() // Since we don't include the index rows in the change set for txn tables, we need to detect row deletes that have transformed by TransactionProcessor - || (CellUtil.matchingQualifier(kv, TxConstants.FAMILY_DELETE_QUALIFIER) && CellUtil.matchingValue(kv, HConstants.EMPTY_BYTE_ARRAY))) { + || (CellUtil.matchingQualifier(kv, TransactionFactory.getTransactionFactory().getTransactionContext().getFamilyDeleteMarker()) && CellUtil.matchingValue(kv, HConstants.EMPTY_BYTE_ARRAY))) { nDeleteCF++; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java index 9edcafc..18b9edd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java @@ -24,15 +24,13 @@ import java.io.IOException; import java.sql.SQLException; import java.util.List; -import org.apache.tephra.Transaction; - import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.cache.IndexMetaDataCache; import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory; -import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; import org.apache.phoenix.memory.MemoryManager.MemoryChunk; -import org.apache.phoenix.util.TransactionUtil; +import org.apache.phoenix.transaction.PhoenixTransactionContext; +import org.apache.phoenix.transaction.TransactionFactory; public class IndexMetaDataCacheFactory implements ServerCacheFactory { public IndexMetaDataCacheFactory() { @@ -49,11 +47,12 @@ public class IndexMetaDataCacheFactory implements ServerCacheFactory { @Override public Closeable newCache (ImmutableBytesWritable cachePtr, byte[] txState, final MemoryChunk chunk, boolean useProtoForIndexMaintainer) throws SQLException { // just use the standard keyvalue builder - this doesn't really need to be fast + final List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(cachePtr, GenericKeyValueBuilder.INSTANCE, useProtoForIndexMaintainer); - final Transaction txn; + final PhoenixTransactionContext txnContext; try { - txn = txState.length!=0 ? MutationState.decodeTransaction(txState) : null; + txnContext = txState.length != 0 ? TransactionFactory.getTransactionFactory().getTransactionContext(txState) : null; } catch (IOException e) { throw new SQLException(e); } @@ -70,8 +69,8 @@ public class IndexMetaDataCacheFactory implements ServerCacheFactory { } @Override - public Transaction getTransaction() { - return txn; + public PhoenixTransactionContext getTransactionContext() { + return txnContext; } }; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java index 39473dc..fa2fed2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java @@ -30,12 +30,12 @@ import org.apache.phoenix.cache.TenantCache; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; -import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.transaction.PhoenixTransactionContext; +import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.ServerUtil; -import org.apache.tephra.Transaction; public class PhoenixIndexMetaData implements IndexMetaData { private final Map<String, byte[]> attributes; @@ -56,7 +56,7 @@ public class PhoenixIndexMetaData implements IndexMetaData { byte[] txState = attributes.get(BaseScannerRegionObserver.TX_STATE); if (md != null) { final List<IndexMaintainer> indexMaintainers = IndexMaintainer.deserialize(md, useProto); - final Transaction txn = MutationState.decodeTransaction(txState); + final PhoenixTransactionContext txnContext = TransactionFactory.getTransactionFactory().getTransactionContext(txState); return new IndexMetaDataCache() { @Override @@ -68,8 +68,8 @@ public class PhoenixIndexMetaData implements IndexMetaData { } @Override - public Transaction getTransaction() { - return txn; + public PhoenixTransactionContext getTransactionContext() { + return txnContext; } }; @@ -101,8 +101,8 @@ public class PhoenixIndexMetaData implements IndexMetaData { this.ignoreNewerMutations = attributes.get(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS) != null; } - public Transaction getTransaction() { - return indexMetaDataCache.getTransaction(); + public PhoenixTransactionContext getTransactionContext() { + return indexMetaDataCache.getTransactionContext(); } public List<IndexMaintainer> getIndexMaintainers() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index 563b79e..497c5ac 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -75,14 +75,14 @@ import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.trace.TracingUtils; import org.apache.phoenix.trace.util.NullSpan; +import org.apache.phoenix.transaction.PhoenixTransactionContext; +import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel; +import org.apache.phoenix.transaction.PhoenixTransactionalTable; +import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.TransactionUtil; -import org.apache.tephra.Transaction; -import org.apache.tephra.Transaction.VisibilityLevel; -import org.apache.tephra.TxConstants; -import org.apache.tephra.hbase.TransactionAwareHTable; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -160,7 +160,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { Map<String,byte[]> updateAttributes = m.getAttributesMap(); String tableName = c.getEnvironment().getRegion().getTableDesc().getNameAsString(); PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(c.getEnvironment(),updateAttributes); - byte[] txRollbackAttribute = m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY); + byte[] txRollbackAttribute = m.getAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY); Collection<Pair<Mutation, byte[]>> indexUpdates = null; // get the current span, or just use a null-span to avoid a bunch of if statements try (TraceScope scope = Trace.startSpan("Starting to build index updates")) { @@ -249,14 +249,14 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { } private Collection<Pair<Mutation, byte[]>> getIndexUpdates(RegionCoprocessorEnvironment env, PhoenixIndexMetaData indexMetaData, Iterator<Mutation> mutationIterator, byte[] txRollbackAttribute) throws IOException { - Transaction tx = indexMetaData.getTransaction(); - if (tx == null) { + PhoenixTransactionContext txnContext = indexMetaData.getTransactionContext(); + if (txnContext == null) { throw new NullPointerException("Expected to find transaction in metadata for " + env.getRegionInfo().getTable().getNameAsString()); } boolean isRollback = txRollbackAttribute!=null; boolean isImmutable = indexMetaData.isImmutableRows(); ResultScanner currentScanner = null; - TransactionAwareHTable txTable = null; + PhoenixTransactionalTable txTable = null; // Collect up all mutations in batch Map<ImmutableBytesPtr, MultiMutation> mutations = new HashMap<ImmutableBytesPtr, MultiMutation>(); @@ -321,23 +321,22 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { scanRanges.initializeScan(scan); TableName tableName = env.getRegion().getRegionInfo().getTable(); HTableInterface htable = env.getTable(tableName); - txTable = new TransactionAwareHTable(htable); - txTable.startTx(tx); + txTable = TransactionFactory.getTransactionFactory().getTransactionalTable(txnContext, htable); // For rollback, we need to see all versions, including // the last committed version as there may be multiple // checkpointed versions. SkipScanFilter filter = scanRanges.getSkipScanFilter(); if (isRollback) { filter = new SkipScanFilter(filter,true); - tx.setVisibility(VisibilityLevel.SNAPSHOT_ALL); + txnContext.setVisibilityLevel(PhoenixVisibilityLevel.SNAPSHOT_ALL); } scan.setFilter(filter); currentScanner = txTable.getScanner(scan); } if (isRollback) { - processRollback(env, indexMetaData, txRollbackAttribute, currentScanner, tx, mutableColumns, indexUpdates, mutations); + processRollback(env, indexMetaData, txRollbackAttribute, currentScanner, txnContext, mutableColumns, indexUpdates, mutations); } else { - processMutation(env, indexMetaData, txRollbackAttribute, currentScanner, tx, mutableColumns, indexUpdates, mutations, findPriorValueMutations); + processMutation(env, indexMetaData, txRollbackAttribute, currentScanner, txnContext, mutableColumns, indexUpdates, mutations, findPriorValueMutations); } } finally { if (txTable != null) txTable.close(); @@ -360,7 +359,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { private void processMutation(RegionCoprocessorEnvironment env, PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute, ResultScanner scanner, - Transaction tx, + PhoenixTransactionContext txnContext, Set<ColumnReference> upsertColumns, Collection<Pair<Mutation, byte[]>> indexUpdates, Map<ImmutableBytesPtr, MultiMutation> mutations, @@ -372,14 +371,14 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { // Process existing data table rows by removing the old index row and adding the new index row while ((result = scanner.next()) != null) { Mutation m = mutationsToFindPreviousValue.remove(new ImmutableBytesPtr(result.getRow())); - TxTableState state = new TxTableState(env, upsertColumns, indexMetaData.getAttributes(), tx.getWritePointer(), m, emptyColRef, result); + TxTableState state = new TxTableState(env, upsertColumns, indexMetaData.getAttributes(), txnContext.getWritePointer(), m, emptyColRef, result); generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state); generatePuts(indexMetaData, indexUpdates, state); } } // Process new data table by adding new index rows for (Mutation m : mutations.values()) { - TxTableState state = new TxTableState(env, upsertColumns, indexMetaData.getAttributes(), tx.getWritePointer(), m); + TxTableState state = new TxTableState(env, upsertColumns, indexMetaData.getAttributes(), txnContext.getWritePointer(), m); generatePuts(indexMetaData, indexUpdates, state); } } @@ -387,7 +386,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { private void processRollback(RegionCoprocessorEnvironment env, PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute, ResultScanner scanner, - Transaction tx, Set<ColumnReference> mutableColumns, + PhoenixTransactionContext tx, Set<ColumnReference> mutableColumns, Collection<Pair<Mutation, byte[]>> indexUpdates, Map<ImmutableBytesPtr, MultiMutation> mutations) throws IOException { if (scanner != null) { @@ -474,7 +473,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, indexMetaData); for (IndexUpdate delete : deletes) { if (delete.isValid()) { - delete.getUpdate().setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, attribValue); + delete.getUpdate().setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, attribValue); indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName())); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java index 3e7a6ca..4c64a4f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java @@ -20,6 +20,7 @@ package org.apache.phoenix.iterate; import com.google.common.collect.Lists; import com.google.common.collect.Sets; + import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Result; @@ -54,11 +55,11 @@ import org.apache.phoenix.schema.ValueBitSet; import org.apache.phoenix.schema.tuple.ResultTuple; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PInteger; +import org.apache.phoenix.transaction.PhoenixTransactionContext; import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; -import org.apache.tephra.Transaction; import java.io.ByteArrayInputStream; import java.io.DataInputStream; @@ -111,7 +112,7 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory { Region dataRegion = null; IndexMaintainer indexMaintainer = null; byte[][] viewConstants = null; - Transaction tx = null; + PhoenixTransactionContext tx = null; ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan); if (dataColumns != null) { tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns); http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java index c88727d..898a573 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java @@ -19,6 +19,7 @@ package org.apache.phoenix.iterate; import com.google.common.collect.ImmutableList; + import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -38,6 +39,7 @@ import org.apache.phoenix.schema.KeyValueSchema; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.ValueBitSet; import org.apache.phoenix.schema.tuple.*; +import org.apache.phoenix.transaction.PhoenixTransactionContext; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; @@ -95,7 +97,7 @@ public abstract class RegionScannerFactory { final Expression[] arrayFuncRefs, final int offset, final Scan scan, final ColumnReference[] dataColumns, final TupleProjector tupleProjector, final Region dataRegion, final IndexMaintainer indexMaintainer, - Transaction tx, + PhoenixTransactionContext tx, final byte[][] viewConstants, final KeyValueSchema kvSchema, final ValueBitSet kvSchemaBitSet, final TupleProjector projector, final ImmutableBytesWritable ptr, final boolean useQualifierAsListIndex) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index 42259b1..ccbd955 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -104,6 +104,7 @@ import org.apache.phoenix.schema.types.PUnsignedDate; import org.apache.phoenix.schema.types.PUnsignedTime; import org.apache.phoenix.schema.types.PUnsignedTimestamp; import org.apache.phoenix.trace.util.Tracing; +import org.apache.phoenix.transaction.PhoenixTransactionContext; import org.apache.phoenix.util.DateUtil; import org.apache.phoenix.util.JDBCUtil; import org.apache.phoenix.util.NumberUtil; @@ -113,7 +114,6 @@ import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.SQLCloseables; import org.apache.phoenix.util.SchemaUtil; -import org.apache.tephra.TransactionContext; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; @@ -658,7 +658,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea mutationState.sendUncommitted(); } - public void setTransactionContext(TransactionContext txContext) throws SQLException { + public void setTransactionContext(PhoenixTransactionContext txContext) throws SQLException { if (!this.services.getProps().getBoolean(QueryServices.TRANSACTIONS_ENABLED, QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_MUST_BE_ENABLED_TO_SET_TX_CONTEXT) .build().buildException(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java index 38580e4..45ab5fa 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java @@ -46,7 +46,6 @@ import org.apache.phoenix.schema.SequenceAllocation; import org.apache.phoenix.schema.SequenceKey; import org.apache.phoenix.schema.stats.GuidePostsInfo; import org.apache.phoenix.schema.stats.GuidePostsKey; -import org.apache.tephra.TransactionSystemClient; public interface ConnectionQueryServices extends QueryServices, MetaDataMutated { @@ -132,7 +131,6 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated public long clearCache() throws SQLException; public int getSequenceSaltBuckets(); - TransactionSystemClient getTransactionSystemClient(); public long getRenewLeaseThresholdMilliSeconds(); public boolean isRenewingLeasesEnabled();
