fix bugs

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f584e5f1
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f584e5f1
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f584e5f1

Branch: refs/heads/omid
Commit: f584e5f1a53cfaecb309ecf3201011a1579ebf47
Parents: fa69563
Author: Ohad Shacham <[email protected]>
Authored: Mon Mar 27 08:58:15 2017 +0300
Committer: Ohad Shacham <[email protected]>
Committed: Mon Mar 27 08:58:15 2017 +0300

----------------------------------------------------------------------
 .../apache/phoenix/execute/PartialCommitIT.java |   4 +-
 .../phoenix/tx/FlappingTransactionIT.java       |  27 ++--
 .../phoenix/cache/IndexMetaDataCache.java       |   7 +-
 .../coprocessor/BaseScannerRegionObserver.java  |   4 +-
 .../phoenix/coprocessor/ScanRegionObserver.java |   5 +-
 .../apache/phoenix/execute/MutationState.java   |  51 +++-----
 .../index/IndexMetaDataCacheFactory.java        |  15 ++-
 .../phoenix/index/PhoenixIndexMetaData.java     |  14 +--
 .../index/PhoenixTransactionalIndexer.java      |  35 +++---
 .../apache/phoenix/jdbc/PhoenixConnection.java  |   3 +-
 .../transaction/OmidTransactionContext.java     |  18 +++
 .../transaction/PhoenixTransactionContext.java  |  29 ++++-
 .../transaction/TephraTransactionContext.java   |  87 ++++++++++++-
 .../phoenix/transaction/TransactionFactory.java | 126 +++++++++++++++++++
 .../apache/phoenix/util/TransactionUtil.java    |   8 +-
 15 files changed, 325 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/f584e5f1/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
index a5555f3..636cd84 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
@@ -271,11 +271,11 @@ public class PartialCommitIT extends BaseOwnClusterIT {
         return new PhoenixConnection(phxCon, null) {
             @Override
             protected MutationState newMutationState(int maxSize) {
-                return new MutationState(maxSize, this, mutations, null, null);
+                return new MutationState(maxSize, this, mutations, false, 
null);
             };
         };
     }
-    
+
     public static class FailingRegionObserver extends SimpleRegionObserver {
         @Override
         public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, 
Put put, WALEdit edit,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f584e5f1/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
index 5a990cf..d34f403 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
@@ -42,6 +42,9 @@ import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.PhoenixTransactionalTable;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.TestUtil;
 import org.apache.tephra.TransactionContext;
@@ -228,15 +231,17 @@ public class FlappingTransactionIT extends 
ParallelStatsDisabledIT {
         }
 
         // Use HBase level Tephra APIs to start a new transaction
-        TransactionAwareHTable txAware = new TransactionAwareHTable(htable, 
TxConstants.ConflictDetection.ROW);
-        TransactionContext txContext = new TransactionContext(txServiceClient, 
txAware);
-        txContext.start();
-        
+        //TransactionAwareHTable txAware = new TransactionAwareHTable(htable, 
TxConstants.ConflictDetection.ROW);
+        PhoenixTransactionContext txContext = 
TransactionFactory.getTransactionFactory().getTransactionContext(pconn);
+        PhoenixTransactionalTable txTable = 
TransactionFactory.getTransactionFactory().getTransactionalTable(txContext, 
htable);
+
+        txContext.begin();
+
         // Use HBase APIs to add a new row
         Put put = new Put(Bytes.toBytes("z"));
         put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
         put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("V1"), Bytes.toBytes("b"));
-        txAware.put(put);
+        txTable.put(put);
         
         // Use Phoenix APIs to add new row (sharing the transaction context)
         pconn.setTransactionContext(txContext);
@@ -259,7 +264,7 @@ public class FlappingTransactionIT extends 
ParallelStatsDisabledIT {
         assertEquals(3,rs.getInt(1));
         
         // Use Tephra APIs directly to finish (i.e. commit) the transaction
-        txContext.finish();
+        txContext.commit();
         
         // Confirm that attempt to commit row with conflict fails
         try {
@@ -279,14 +284,16 @@ public class FlappingTransactionIT extends 
ParallelStatsDisabledIT {
         }
         
         // Repeat the same as above, but this time abort the transaction
-        txContext = new TransactionContext(txServiceClient, txAware);
-        txContext.start();
+        txContext = 
TransactionFactory.getTransactionFactory().getTransactionContext(pconn);
+        txTable = 
TransactionFactory.getTransactionFactory().getTransactionalTable(txContext, 
htable);
+
+        txContext.begin();
         
         // Use HBase APIs to add a new row
         put = new Put(Bytes.toBytes("j"));
         put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
         put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("V1"), Bytes.toBytes("e"));
-        txAware.put(put);
+        txTable.put(put);
         
         // Use Phoenix APIs to add new row (sharing the transaction context)
         pconn.setTransactionContext(txContext);
@@ -325,4 +332,4 @@ public class FlappingTransactionIT extends 
ParallelStatsDisabledIT {
         assertTrue(result.isEmpty());
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f584e5f1/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java 
b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
index d22993c..16207c8 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
@@ -23,9 +23,8 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.tephra.Transaction;
-
 import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
 
 public interface IndexMetaDataCache extends Closeable {
     public static final IndexMetaDataCache EMPTY_INDEX_META_DATA_CACHE = new 
IndexMetaDataCache() {
@@ -40,11 +39,11 @@ public interface IndexMetaDataCache extends Closeable {
         }
 
         @Override
-        public Transaction getTransaction() {
+        public PhoenixTransactionContext getTransactionContext() {
             return null;
         }
         
     };
     public List<IndexMaintainer> getIndexMaintainers();
-    public Transaction getTransaction();
+    public PhoenixTransactionContext getTransactionContext();
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f584e5f1/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index f6bd512..321d117 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -315,7 +315,7 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
             final byte[][] viewConstants, final TupleProjector projector,
             final ImmutableBytesWritable ptr) {
         return getWrappedScanner(c, s, null, null, offset, scan, dataColumns, 
tupleProjector,
-                dataRegion, indexMaintainer, null, viewConstants, null, null, 
projector, ptr);
+                dataRegion, indexMaintainer, viewConstants, null, null, 
projector, ptr);
     }
 
     /**
@@ -330,7 +330,6 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
      * @param tupleProjector
      * @param dataRegion
      * @param indexMaintainer
-     * @param tx current transaction
      * @param viewConstants
      */
     protected RegionScanner getWrappedScanner(final 
ObserverContext<RegionCoprocessorEnvironment> c,
@@ -338,7 +337,6 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
             final Expression[] arrayFuncRefs, final int offset, final Scan 
scan,
             final ColumnReference[] dataColumns, final TupleProjector 
tupleProjector,
             final Region dataRegion, final IndexMaintainer indexMaintainer,
-            Transaction tx, 
             final byte[][] viewConstants, final KeyValueSchema kvSchema,
             final ValueBitSet kvSchemaBitSet, final TupleProjector projector,
             final ImmutableBytesWritable ptr) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f584e5f1/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index ade88db..0e0e3ba 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -204,7 +204,6 @@ public class ScanRegionObserver extends 
BaseScannerRegionObserver {
         Region dataRegion = null;
         IndexMaintainer indexMaintainer = null;
         byte[][] viewConstants = null;
-        Transaction tx = null;
         ColumnReference[] dataColumns = 
IndexUtil.deserializeDataTableColumnsToJoin(scan);
         if (dataColumns != null) {
             tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns);
@@ -213,15 +212,13 @@ public class ScanRegionObserver extends 
BaseScannerRegionObserver {
             List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? 
null : IndexMaintainer.deserialize(localIndexBytes);
             indexMaintainer = indexMaintainers.get(0);
             viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
-            byte[] txState = 
scan.getAttribute(BaseScannerRegionObserver.TX_STATE);
-            tx = MutationState.decodeTransaction(txState);
         }
 
         final TupleProjector p = 
TupleProjector.deserializeProjectorFromScan(scan);
         final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
         innerScanner =
                 getWrappedScanner(c, innerScanner, arrayKVRefs, arrayFuncRefs, 
offset, scan,
-                    dataColumns, tupleProjector, dataRegion, indexMaintainer, 
tx,
+                    dataColumns, tupleProjector, dataRegion, indexMaintainer,
                     viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : 
null, ptr);
 
         final ImmutableBytesPtr tenantId = ScanUtil.getTenantId(scan);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f584e5f1/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index c480e30..23c8b2a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -87,7 +87,7 @@ import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.transaction.PhoenixTransactionContext;
 import 
org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
 import org.apache.phoenix.transaction.PhoenixTransactionalTable;
-import org.apache.phoenix.transaction.TephraTransactionContext;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.LogUtil;
@@ -97,17 +97,6 @@ import org.apache.phoenix.util.SQLCloseables;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.TransactionUtil;
-import org.apache.tephra.Transaction;
-import org.apache.tephra.Transaction.VisibilityLevel;
-import org.apache.tephra.TransactionAware;
-import org.apache.tephra.TransactionCodec;
-import org.apache.tephra.TransactionConflictException;
-import org.apache.tephra.TransactionContext;
-import org.apache.tephra.TransactionFailureException;
-import org.apache.tephra.TransactionSystemClient;
-import org.apache.tephra.hbase.TransactionAwareHTable;
-import org.apache.tephra.visibility.FenceWait;
-import org.apache.tephra.visibility.VisibilityFence;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -124,7 +113,6 @@ import com.google.common.collect.Sets;
  */
 public class MutationState implements SQLCloseable {
     private static final Logger logger = 
LoggerFactory.getLogger(MutationState.class);
-    private static final TransactionCodec CODEC = new TransactionCodec();
     private static final int[] EMPTY_STATEMENT_INDEX_ARRAY = new int[0];
     private static final int MAX_COMMIT_RETRIES = 3;
 
@@ -183,20 +171,20 @@ public class MutationState implements SQLCloseable {
                 : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
         if (subTask == false) {
             if (txContext == null) {
-                phoenixTransactionContext = new 
TephraTransactionContext(connection);
+                phoenixTransactionContext = 
TransactionFactory.getTransactionFactory().getTransactionContext(connection);
             } else {
                 isExternalTxContext = true;
-                phoenixTransactionContext = new 
TephraTransactionContext(txContext, connection, subTask);
+                phoenixTransactionContext = 
TransactionFactory.getTransactionFactory().getTransactionContext(txContext, 
connection, subTask);
             }
         } else {
             // this code path is only used while running child scans, we can't 
pass the txContext to child scans
             // as it is not thread safe, so we use the tx member variable
-            phoenixTransactionContext = new 
TephraTransactionContext(txContext, connection, subTask);
+            phoenixTransactionContext = 
TransactionFactory.getTransactionFactory().getTransactionContext(txContext, 
connection, subTask);
         }
     }
 
     public MutationState(TableRef table, 
Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long 
maxSize, PhoenixConnection connection) {
-        this(maxSize, connection, true, 
connection.getMutationState().getPhoenixTransactionContext(), sizeOffset);
+        this(maxSize, connection, false, null, sizeOffset);
         this.mutations.put(table, mutations);
         this.numRows = mutations.size();
         throwIfTooBig();
@@ -220,7 +208,7 @@ public class MutationState implements SQLCloseable {
      * @param dataTable the data table upon which an index is being added
      * @throws SQLException
      */
-    public void commitDDLFence(PTable dataTable, Logger logger) throws 
SQLException {
+    public void commitDDLFence(PTable dataTable) throws SQLException {
         if (dataTable.isTransactional()) {
             try {
                 phoenixTransactionContext.commitDDLFence(dataTable, logger);
@@ -305,13 +293,11 @@ public class MutationState implements SQLCloseable {
     // would not change as these threads are running.
     public HTableInterface getHTable(PTable table) throws SQLException {
         HTableInterface htable = 
this.getConnection().getQueryServices().getTable(table.getPhysicalName().getBytes());
-        Transaction currentTx;
-        if (table.isTransactional() && (currentTx=getTransaction()) != null) {
-            TransactionAwareHTable txAware = 
TransactionUtil.getTransactionAwareHTable(htable, table.isImmutableRows());
+        if (table.isTransactional() && 
phoenixTransactionContext.isTransactionRunning()) {
+            PhoenixTransactionalTable phoenixTransactionTable = 
TransactionUtil.getPhoenixTransactionTable(phoenixTransactionContext, htable, 
table.isImmutableRows());
             // Using cloned mutationState as we may have started a new 
transaction already
             // if auto commit is true and we need to use the original one here.
-            txAware.startTx(currentTx);
-            htable = txAware;
+            htable = phoenixTransactionTable;
         }
         return htable;
     }
@@ -440,7 +426,7 @@ public class MutationState implements SQLCloseable {
             return;
         }
 
-        phoenixTransactionContext.join(getPhoenixTransactionContext());
+        
phoenixTransactionContext.join(newMutationState.getPhoenixTransactionContext());
 
         this.sizeOffset += newMutationState.sizeOffset;
         joinMutationState(newMutationState.mutations, this.mutations);
@@ -1090,17 +1076,9 @@ public class MutationState implements SQLCloseable {
     }
 
     public byte[] encodeTransaction() throws SQLException {
-        try {
-            return CODEC.encode(getTransaction());
-        } catch (IOException e) {
-            throw new SQLException(e);
-        }
+        return phoenixTransactionContext.encodeTransaction();
     }
     
-    public static Transaction decodeTransaction(byte[] txnBytes) throws 
IOException {
-        return (txnBytes == null || txnBytes.length==0) ? null : 
CODEC.decode(txnBytes);
-    }
-
     private ServerCache setMetaDataOnMutations(TableRef tableRef, List<? 
extends Mutation> mutations,
             ImmutableBytesWritable indexMetaDataPtr) throws SQLException {
         PTable table = tableRef.getTable();
@@ -1333,12 +1311,13 @@ public class MutationState implements SQLCloseable {
      * @throws SQLException
      */
     public boolean sendUncommitted(Iterator<TableRef> tableRefs) throws 
SQLException {
-        Transaction currentTx = getTransaction();
-        if (currentTx != null) {
+
+        if (phoenixTransactionContext.isTransactionRunning()) {
             // Initialize visibility so that transactions see their own writes.
             // The checkpoint() method will set it to not see writes if 
necessary.
-            currentTx.setVisibility(VisibilityLevel.SNAPSHOT);
+            
phoenixTransactionContext.setVisibilityLevel(PhoenixVisibilityLevel.SNAPSHOT);
         }
+
         Iterator<TableRef> filteredTableRefs = Iterators.filter(tableRefs, new 
Predicate<TableRef>(){
             @Override
             public boolean apply(TableRef tableRef) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f584e5f1/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
index 56849fe..8658524 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
@@ -24,15 +24,13 @@ import java.io.IOException;
 import java.sql.SQLException;
 import java.util.List;
 
-import org.apache.tephra.Transaction;
-
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.cache.IndexMetaDataCache;
 import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
-import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
-import org.apache.phoenix.util.TransactionUtil;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.TransactionFactory;
 
 public class IndexMetaDataCacheFactory implements ServerCacheFactory {
     public IndexMetaDataCacheFactory() {
@@ -49,11 +47,12 @@ public class IndexMetaDataCacheFactory implements 
ServerCacheFactory {
     @Override
     public Closeable newCache (ImmutableBytesWritable cachePtr, byte[] 
txState, final MemoryChunk chunk) throws SQLException {
         // just use the standard keyvalue builder - this doesn't really need 
to be fast
+        
         final List<IndexMaintainer> maintainers = 
                 IndexMaintainer.deserialize(cachePtr, 
GenericKeyValueBuilder.INSTANCE);
-        final Transaction txn;
+        final PhoenixTransactionContext txnContext;
         try {
-            txn = txState.length!=0 ? MutationState.decodeTransaction(txState) 
: null;
+            txnContext = txState.length != 0 ? 
TransactionFactory.getTransactionFactory().getTransactionContext(txState) : 
null;
         } catch (IOException e) {
             throw new SQLException(e);
         }
@@ -70,8 +69,8 @@ public class IndexMetaDataCacheFactory implements 
ServerCacheFactory {
             }
 
             @Override
-            public Transaction getTransaction() {
-                return txn;
+            public PhoenixTransactionContext getTransactionContext() {
+                return txnContext;
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f584e5f1/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
index d22e957..82fe2f3 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
@@ -30,12 +30,12 @@ import org.apache.phoenix.cache.TenantCache;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
-import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ServerUtil;
-import org.apache.tephra.Transaction;
 
 public class PhoenixIndexMetaData implements IndexMetaData {
     private final Map<String, byte[]> attributes;
@@ -51,7 +51,7 @@ public class PhoenixIndexMetaData implements IndexMetaData {
         byte[] txState = attributes.get(BaseScannerRegionObserver.TX_STATE);
         if (md != null) {
             final List<IndexMaintainer> indexMaintainers = 
IndexMaintainer.deserialize(md);
-            final Transaction txn = MutationState.decodeTransaction(txState);
+            final PhoenixTransactionContext txnContext = 
TransactionFactory.getTransactionFactory().getTransactionContext(txState);
             return new IndexMetaDataCache() {
 
                 @Override
@@ -63,8 +63,8 @@ public class PhoenixIndexMetaData implements IndexMetaData {
                 }
 
                 @Override
-                public Transaction getTransaction() {
-                    return txn;
+                public PhoenixTransactionContext getTransactionContext() {
+                    return txnContext;
                 }
 
             };
@@ -96,8 +96,8 @@ public class PhoenixIndexMetaData implements IndexMetaData {
         this.ignoreNewerMutations = 
attributes.get(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS) != null;
     }
     
-    public Transaction getTransaction() {
-        return indexMetaDataCache.getTransaction();
+    public PhoenixTransactionContext getTransactionContext() {
+        return indexMetaDataCache.getTransactionContext();
     }
     
     public List<IndexMaintainer> getIndexMaintainers() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f584e5f1/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index fdf5498..a418c24 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -69,14 +69,14 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.trace.TracingUtils;
 import org.apache.phoenix.trace.util.NullSpan;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import 
org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
+import org.apache.phoenix.transaction.PhoenixTransactionalTable;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.TransactionUtil;
-import org.apache.tephra.Transaction;
-import org.apache.tephra.Transaction.VisibilityLevel;
-import org.apache.tephra.TxConstants;
-import org.apache.tephra.hbase.TransactionAwareHTable;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -149,7 +149,7 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
 
         Map<String,byte[]> updateAttributes = m.getAttributesMap();
         PhoenixIndexMetaData indexMetaData = new 
PhoenixIndexMetaData(c.getEnvironment(),updateAttributes);
-        byte[] txRollbackAttribute = 
m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY);
+        byte[] txRollbackAttribute = 
m.getAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY);
         Collection<Pair<Mutation, byte[]>> indexUpdates = null;
         // get the current span, or just use a null-span to avoid a bunch of 
if statements
         try (TraceScope scope = Trace.startSpan("Starting to build index 
updates")) {
@@ -186,14 +186,14 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
     }
     
     private Collection<Pair<Mutation, byte[]>> 
getIndexUpdates(RegionCoprocessorEnvironment env, PhoenixIndexMetaData 
indexMetaData, Iterator<Mutation> mutationIterator, byte[] txRollbackAttribute) 
throws IOException {
-        Transaction tx = indexMetaData.getTransaction();
-        if (tx == null) {
+        PhoenixTransactionContext txnContext = 
indexMetaData.getTransactionContext();
+        if (txnContext == null) {
             throw new NullPointerException("Expected to find transaction in 
metadata for " + env.getRegionInfo().getTable().getNameAsString());
         }
         boolean isRollback = txRollbackAttribute!=null;
         boolean isImmutable = indexMetaData.isImmutableRows();
         ResultScanner currentScanner = null;
-        TransactionAwareHTable txTable = null;
+        PhoenixTransactionalTable txTable = null;
         // Collect up all mutations in batch
         Map<ImmutableBytesPtr, MultiMutation> mutations =
                 new HashMap<ImmutableBytesPtr, MultiMutation>();
@@ -250,23 +250,22 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
                 scanRanges.initializeScan(scan);
                 TableName tableName = 
env.getRegion().getRegionInfo().getTable();
                 HTableInterface htable = env.getTable(tableName);
-                txTable = new TransactionAwareHTable(htable);
-                txTable.startTx(tx);
+                txTable = 
TransactionFactory.getTransactionFactory().getTransactionalTable(txnContext, 
htable);
                 // For rollback, we need to see all versions, including
                 // the last committed version as there may be multiple
                 // checkpointed versions.
                 SkipScanFilter filter = scanRanges.getSkipScanFilter();
                 if (isRollback) {
                     filter = new SkipScanFilter(filter,true);
-                    tx.setVisibility(VisibilityLevel.SNAPSHOT_ALL);
+                    
txnContext.setVisibilityLevel(PhoenixVisibilityLevel.SNAPSHOT_ALL);
                 }
                 scan.setFilter(filter);
                 currentScanner = txTable.getScanner(scan);
             }
             if (isRollback) {
-                processRollback(env, indexMetaData, txRollbackAttribute, 
currentScanner, tx, mutableColumns, indexUpdates, mutations);
+                processRollback(env, indexMetaData, txRollbackAttribute, 
currentScanner, txnContext, mutableColumns, indexUpdates, mutations);
             } else {
-                processMutation(env, indexMetaData, txRollbackAttribute, 
currentScanner, tx, mutableColumns, indexUpdates, mutations, 
findPriorValueMutations);
+                processMutation(env, indexMetaData, txRollbackAttribute, 
currentScanner, txnContext, mutableColumns, indexUpdates, mutations, 
findPriorValueMutations);
             }
         } finally {
             if (txTable != null) txTable.close();
@@ -289,7 +288,7 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
     private void processMutation(RegionCoprocessorEnvironment env,
             PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute,
             ResultScanner scanner,
-            Transaction tx, 
+            PhoenixTransactionContext txnContext, 
             Set<ColumnReference> upsertColumns, 
             Collection<Pair<Mutation, byte[]>> indexUpdates,
             Map<ImmutableBytesPtr, MultiMutation> mutations,
@@ -300,14 +299,14 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
             // Process existing data table rows by removing the old index row 
and adding the new index row
             while ((result = scanner.next()) != null) {
                 Mutation m = mutationsToFindPreviousValue.remove(new 
ImmutableBytesPtr(result.getRow()));
-                TxTableState state = new TxTableState(env, upsertColumns, 
indexMetaData.getAttributes(), tx.getWritePointer(), m, emptyColRef, result);
+                TxTableState state = new TxTableState(env, upsertColumns, 
indexMetaData.getAttributes(), txnContext.getWritePointer(), m, emptyColRef, 
result);
                 generateDeletes(indexMetaData, indexUpdates, 
txRollbackAttribute, state);
                 generatePuts(indexMetaData, indexUpdates, state);
             }
         }
         // Process new data table by adding new index rows
         for (Mutation m : mutations.values()) {
-            TxTableState state = new TxTableState(env, upsertColumns, 
indexMetaData.getAttributes(), tx.getWritePointer(), m);
+            TxTableState state = new TxTableState(env, upsertColumns, 
indexMetaData.getAttributes(), txnContext.getWritePointer(), m);
             generatePuts(indexMetaData, indexUpdates, state);
         }
     }
@@ -315,7 +314,7 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
     private void processRollback(RegionCoprocessorEnvironment env,
             PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute,
             ResultScanner scanner,
-            Transaction tx, Set<ColumnReference> mutableColumns,
+            PhoenixTransactionContext tx, Set<ColumnReference> mutableColumns,
             Collection<Pair<Mutation, byte[]>> indexUpdates,
             Map<ImmutableBytesPtr, MultiMutation> mutations) throws 
IOException {
         if (scanner != null) {
@@ -402,7 +401,7 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
         Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, 
indexMetaData);
         for (IndexUpdate delete : deletes) {
             if (delete.isValid()) {
-                
delete.getUpdate().setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, 
attribValue);
+                
delete.getUpdate().setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY,
 attribValue);
                 indexUpdates.add(new Pair<Mutation, 
byte[]>(delete.getUpdate(),delete.getTableName()));
             }
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f584e5f1/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index cb2390e..d387ab7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -103,6 +103,7 @@ import org.apache.phoenix.schema.types.PUnsignedDate;
 import org.apache.phoenix.schema.types.PUnsignedTime;
 import org.apache.phoenix.schema.types.PUnsignedTimestamp;
 import org.apache.phoenix.trace.util.Tracing;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
 import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.JDBCUtil;
 import org.apache.phoenix.util.NumberUtil;
@@ -629,7 +630,7 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
         mutationState.sendUncommitted();
     }
         
-    public void setTransactionContext(TransactionContext txContext) throws 
SQLException {
+    public void setTransactionContext(PhoenixTransactionContext txContext) 
throws SQLException {
         if 
(!this.services.getProps().getBoolean(QueryServices.TRANSACTIONS_ENABLED, 
QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)) {
             throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.TX_MUST_BE_ENABLED_TO_SET_TX_CONTEXT)
             .build().buildException();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f584e5f1/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
index 596cf73..8a4e284 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
@@ -85,4 +85,22 @@ public class OmidTransactionContext implements 
PhoenixTransactionContext {
         // TODO Auto-generated method stub
         return null;
     }
+
+    @Override
+    public void setVisibilityLevel(PhoenixVisibilityLevel visibilityLevel) {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    public byte[] encodeTransaction() throws SQLException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public long getMaxTransactionsPerSecond() {
+        // TODO Auto-generated method stub
+        return 0;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f584e5f1/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
index 2d0d5b7..bd63930 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
@@ -1,7 +1,6 @@
 package org.apache.phoenix.transaction;
 
 import org.apache.phoenix.schema.PTable;
-import org.apache.tephra.Transaction.VisibilityLevel;
 import org.slf4j.Logger;
 
 import java.sql.SQLException;
@@ -20,6 +19,8 @@ public interface PhoenixTransactionContext {
         SNAPSHOT_ALL
       }
 
+    public static final String TX_ROLLBACK_ATTRIBUTE_KEY = 
"phoenix.tx.rollback"; 
+
     /**
      * Starts a transaction
      *
@@ -87,20 +88,36 @@ public interface PhoenixTransactionContext {
     /**
      * Returns transaction unique identifier
      */
-    long getTransactionId();
+    public long getTransactionId();
 
     /**
      * Returns transaction snapshot id
      */
-    long getReadPointer();
+    public long getReadPointer();
 
     /**
      * Returns transaction write pointer. After checkpoint the write pointer 
is different than the initial one  
      */
-    long getWritePointer();
+    public long getWritePointer();
+
+    /**
+     * Set visibility level
+     */
+    public void setVisibilityLevel(PhoenixVisibilityLevel visibilityLevel);
 
     /**
-     * Returns visibility level 
+     * Returns visibility level
+     */
+    public PhoenixVisibilityLevel getVisibilityLevel();
+
+    /**
+     * Encode transaction
+     */
+    public byte[] encodeTransaction() throws SQLException;
+
+    /**
+     * 
+     * @return max transactions per second
      */
-    PhoenixVisibilityLevel getVisibilityLevel();    
+    public long getMaxTransactionsPerSecond();
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f584e5f1/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
index f8096d5..cfa3ac3 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
@@ -1,5 +1,6 @@
 package org.apache.phoenix.transaction;
 
+import java.io.IOException;
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
@@ -11,14 +12,20 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.schema.PTable;
+import 
org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.TransactionAware;
+import org.apache.tephra.TransactionCodec;
 import org.apache.tephra.TransactionConflictException;
 import org.apache.tephra.TransactionContext;
 import org.apache.tephra.TransactionFailureException;
+import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TransactionSystemClient;
 import org.apache.tephra.Transaction.VisibilityLevel;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.inmemory.InMemoryTxSystemClient;
 import org.apache.tephra.visibility.FenceWait;
 import org.apache.tephra.visibility.VisibilityFence;
 
@@ -28,12 +35,24 @@ import org.slf4j.Logger;
 
 public class TephraTransactionContext implements PhoenixTransactionContext {
 
+    private static final TransactionCodec CODEC = new TransactionCodec();
+
     private final List<TransactionAware> txAwares;
     private final TransactionContext txContext;
     private Transaction tx;
     private TransactionSystemClient txServiceClient;
     private TransactionFailureException e;
 
+    public TephraTransactionContext() {
+        this.txServiceClient = null;
+        this.txAwares = Lists.newArrayList();
+        this.txContext = null;
+    }
+
+    public TephraTransactionContext(byte[] txnBytes) throws IOException {
+        this();
+        this.tx = (txnBytes != null && txnBytes.length > 0) ? 
CODEC.decode(txnBytes) : null;
+    }
 
     public TephraTransactionContext(PhoenixConnection connection) {
         this.txServiceClient = 
connection.getQueryServices().getTransactionSystemClient();
@@ -65,6 +84,7 @@ public class TephraTransactionContext implements 
PhoenixTransactionContext {
             throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build().buildException();
         }
 
+        System.out.println("BEGIN");
         try {
             txContext.start();
         } catch (TransactionFailureException e) {
@@ -150,6 +170,14 @@ public class TephraTransactionContext implements 
PhoenixTransactionContext {
         }
     }
 
+    private Transaction getCurrentTransaction() {
+        if (this.txContext != null) {
+            return this.txContext.getCurrentTransaction();
+        }
+
+        return this.tx;
+    }
+
     @Override
     public void commitDDLFence(PTable dataTable, Logger logger) throws 
SQLException {
         byte[] key = dataTable.getName().getBytes();
@@ -159,7 +187,7 @@ public class TephraTransactionContext implements 
PhoenixTransactionContext {
             fenceWait.await(10000, TimeUnit.MILLISECONDS);
             
             if (logger.isInfoEnabled()) {
-                logger.info("Added write fence at ~" + 
getTransaction().getReadPointer());
+                logger.info("Added write fence at ~" + 
getCurrentTransaction().getReadPointer());
             }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
@@ -199,8 +227,6 @@ public class TephraTransactionContext implements 
PhoenixTransactionContext {
         assert(ctx instanceof TephraTransactionContext);
         TephraTransactionContext tephraContext = (TephraTransactionContext) 
ctx;
 
-        tephraContext.getAwares();
-
         if (txContext != null) {
             for (TransactionAware txAware : tephraContext.getAwares()) {
                 txContext.addTransactionAware(txAware);
@@ -269,6 +295,33 @@ public class TephraTransactionContext implements 
PhoenixTransactionContext {
         return HConstants.LATEST_TIMESTAMP;
     }
 
+    @Override
+    public void setVisibilityLevel(PhoenixVisibilityLevel visibilityLevel) {
+        VisibilityLevel tephraVisibilityLevel = null;
+
+        switch(visibilityLevel) {
+        case SNAPSHOT:
+            tephraVisibilityLevel = VisibilityLevel.SNAPSHOT;
+            break;
+        case SNAPSHOT_EXCLUDE_CURRENT:
+            tephraVisibilityLevel = VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT;
+            break;
+        case SNAPSHOT_ALL:
+            tephraVisibilityLevel = VisibilityLevel.SNAPSHOT_ALL;
+            break;
+        default:
+            assert(false);               
+        }
+
+        if (this.txContext != null) {
+            
txContext.getCurrentTransaction().setVisibility(tephraVisibilityLevel);
+        } else if (tx != null) {
+            tx.setVisibility(tephraVisibilityLevel);
+        } else {
+            assert(false);
+        }
+    }
+    
     // For testing
     @Override
     public PhoenixVisibilityLevel getVisibilityLevel() {
@@ -297,7 +350,33 @@ public class TephraTransactionContext implements 
PhoenixTransactionContext {
         return phoenixVisibilityLevel;
     }
 
-   /**
+    @Override
+    public byte[] encodeTransaction() throws SQLException {
+
+        Transaction transaction = null;
+
+        if (this.txContext != null) {
+            transaction = txContext.getCurrentTransaction();
+        } else if (tx != null) {
+            transaction =  tx;
+        }
+
+        assert (transaction != null);
+
+        try {
+            return CODEC.encode(transaction);
+        } catch (IOException e) {
+            throw new SQLException(e);
+        }
+    }
+    
+    @Override
+    public long getMaxTransactionsPerSecond() {
+        return TxConstants.MAX_TX_PER_MS;
+    }
+
+
+    /**
     * TephraTransactionContext specific functions
     */
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f584e5f1/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
new file mode 100644
index 0000000..ba80d02
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
@@ -0,0 +1,126 @@
+package org.apache.phoenix.transaction;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+
+public class TransactionFactory {
+
+    static private TransactionFactory transactionFactory = null;
+
+    private TransactionProcessor tp = TransactionProcessor.Tephra;
+
+    enum TransactionProcessor {
+        Tephra,
+        Omid
+    }
+
+    private TransactionFactory(TransactionProcessor tp) {
+        this.tp = tp;
+    }
+
+    static public void createTransactionFactory(TransactionProcessor tp) {
+        if (transactionFactory == null) {
+            transactionFactory = new TransactionFactory(tp);
+        }
+    }
+
+    static public TransactionFactory getTransactionFactory() {
+        if (transactionFactory == null) {
+            createTransactionFactory(TransactionProcessor.Tephra);
+        }
+
+        return transactionFactory;
+    }
+
+    public PhoenixTransactionContext getTransactionContext()  {
+
+        PhoenixTransactionContext ctx = null;
+
+        switch(tp) {
+        case Tephra:
+            ctx = new TephraTransactionContext();
+            break;
+        case Omid:
+            ctx = new OmidTransactionContext();
+            break;
+        default:
+            ctx = null;
+        }
+        
+        return ctx;
+    }
+
+    public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) 
throws IOException {
+
+        PhoenixTransactionContext ctx = null;
+
+        switch(tp) {
+        case Tephra:
+            ctx = new TephraTransactionContext(txnBytes);
+            break;
+        case Omid:
+//            ctx = new OmidTransactionContext(txnBytes);
+            break;
+        default:
+            ctx = null;
+        }
+        
+        return ctx;
+    }
+    
+    public PhoenixTransactionContext getTransactionContext(PhoenixConnection 
connection) {
+
+        PhoenixTransactionContext ctx = null;
+
+        switch(tp) {
+        case Tephra:
+            ctx = new TephraTransactionContext(connection);
+            break;
+        case Omid:
+//            ctx = new OmidTransactionContext(connection);
+            break;
+        default:
+            ctx = null;
+        }
+        
+        return ctx;
+    }
+
+    public PhoenixTransactionContext 
getTransactionContext(PhoenixTransactionContext contex, PhoenixConnection 
connection, boolean subTask) {
+
+        PhoenixTransactionContext ctx = null;
+
+        switch(tp) {
+        case Tephra:
+            ctx = new TephraTransactionContext(contex, connection, subTask);
+            break;
+        case Omid:
+//            ctx = new OmidTransactionContext(contex, connection, subTask);
+            break;
+        default:
+            ctx = null;
+        }
+        
+        return ctx;
+    }
+
+    public PhoenixTransactionalTable 
getTransactionalTable(PhoenixTransactionContext ctx, HTableInterface htable) {
+
+        PhoenixTransactionalTable table = null;
+
+        switch(tp) {
+        case Tephra:
+            table = new TephraTransactionTable(ctx, htable);
+            break;
+        case Omid:
+//            table = new OmidTransactionContext(contex, connection, subTask);
+            break;
+        default:
+            table = null;
+        }
+        
+        return table;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f584e5f1/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
index 4fbbe57..94a56b8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
@@ -32,10 +32,8 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.transaction.PhoenixTransactionContext;
 import org.apache.phoenix.transaction.PhoenixTransactionalTable;
 import org.apache.phoenix.transaction.TephraTransactionTable;
-import org.apache.tephra.TransactionConflictException;
-import org.apache.tephra.TransactionFailureException;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.tephra.TxConstants;
-import org.apache.tephra.hbase.TransactionAwareHTable;
 
 public class TransactionUtil {
     private TransactionUtil() {
@@ -46,11 +44,11 @@ public class TransactionUtil {
     }
     
     public static long convertToNanoseconds(long serverTimeStamp) {
-        return serverTimeStamp * TxConstants.MAX_TX_PER_MS;
+        return serverTimeStamp * 
TransactionFactory.getTransactionFactory().getTransactionContext().getMaxTransactionsPerSecond();
     }
     
     public static long convertToMilliseconds(long serverTimeStamp) {
-        return serverTimeStamp / TxConstants.MAX_TX_PER_MS;
+        return serverTimeStamp / 
TransactionFactory.getTransactionFactory().getTransactionContext().getMaxTransactionsPerSecond();
     }
     
     public static PhoenixTransactionalTable 
getPhoenixTransactionTable(PhoenixTransactionContext phoenixTransactionContext, 
HTableInterface htable, boolean isImmutableRows) {

Reply via email to