Repository: phoenix
Updated Branches:
  refs/heads/5.x-HBase-2.0 5d4cb8041 -> 15fa00fa5


http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
index ce70dd9..dee02d1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
@@ -25,42 +25,57 @@ import java.util.Map;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Table;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.transaction.PhoenixTransactionContext;
-import org.apache.phoenix.transaction.PhoenixTransactionalTable;
-import org.apache.phoenix.transaction.TephraTransactionTable;
 import org.apache.phoenix.transaction.TransactionFactory;
-import org.apache.tephra.util.TxUtils;
 
 public class TransactionUtil {
+    // All transaction providers must use an empty byte array as the family 
delete marker
+    // (see TxConstants.FAMILY_DELETE_QUALIFIER)
+    public static final byte[] FAMILY_DELETE_MARKER = 
HConstants.EMPTY_BYTE_ARRAY;
+    // All transaction providers must multiply timestamps by this constant.
+    // (see TxConstants.MAX_TX_PER_MS)
+    public static final int MAX_TRANSACTIONS_PER_MILLISECOND = 1000000;
+    // Constant used to empirically determine if a timestamp is a 
transactional or
+    // non transactional timestamp (see TxUtils.MAX_NON_TX_TIMESTAMP)
+    private static final long MAX_NON_TX_TIMESTAMP = (long) 
(System.currentTimeMillis() * 1.1);
+    
     private TransactionUtil() {
+        
     }
     
     public static boolean isTransactionalTimestamp(long ts) {
-        return !TxUtils.isPreExistingVersion(ts);
+        return ts >= MAX_NON_TX_TIMESTAMP;
     }
     
     public static boolean isDelete(Cell cell) {
-        return (CellUtil.matchingValue(cell, HConstants.EMPTY_BYTE_ARRAY));
+        return CellUtil.matchingValue(cell, HConstants.EMPTY_BYTE_ARRAY);
     }
     
-    public static long convertToNanoseconds(long serverTimeStamp) {
-        return serverTimeStamp * 
TransactionFactory.getTransactionProvider().getTransactionContext().getMaxTransactionsPerSecond();
+    public static boolean isDeleteFamily(Cell cell) {
+        return CellUtil.matchingQualifier(cell, FAMILY_DELETE_MARKER) && 
CellUtil.matchingValue(cell, HConstants.EMPTY_BYTE_ARRAY);
     }
     
-    public static long convertToMilliseconds(long serverTimeStamp) {
-        return serverTimeStamp / 
TransactionFactory.getTransactionProvider().getTransactionContext().getMaxTransactionsPerSecond();
+    private static Cell newDeleteFamilyMarker(byte[] row, byte[] family, long 
timestamp) {
+        return CellUtil.createCell(row, family, FAMILY_DELETE_MARKER, 
timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY);
     }
     
-    public static PhoenixTransactionalTable 
getPhoenixTransactionTable(PhoenixTransactionContext phoenixTransactionContext, 
Table htable, PTable pTable) {
-        return new TephraTransactionTable(phoenixTransactionContext, htable, 
pTable);
+    private static Cell newDeleteColumnMarker(byte[] row, byte[] family, 
byte[] qualifier, long timestamp) {
+        return CellUtil.createCell(row, family, qualifier, timestamp, 
KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY);
+    }
+
+    public static long convertToNanoseconds(long serverTimeStamp) {
+        return serverTimeStamp * MAX_TRANSACTIONS_PER_MILLISECOND;
+    }
+    
+    public static long convertToMilliseconds(long serverTimeStamp) {
+        return serverTimeStamp / MAX_TRANSACTIONS_PER_MILLISECOND;
     }
     
     // we resolve transactional tables at the txn read pointer
@@ -83,14 +98,14 @@ public class TransactionUtil {
                return  txInProgress ? 
convertToMilliseconds(mutationState.getInitialWritePointer()) : 
result.getMutationTime();
        }
 
-       public static Long getTableTimestamp(PhoenixConnection connection, 
boolean transactional) throws SQLException {
+       public static Long getTableTimestamp(PhoenixConnection connection, 
boolean transactional, TransactionFactory.Provider provider) throws 
SQLException {
                Long timestamp = null;
                if (!transactional) {
                        return timestamp;
                }
                MutationState mutationState = connection.getMutationState();
                if (!mutationState.isTransactionStarted()) {
-                       mutationState.startTransaction();
+                       mutationState.startTransaction(provider);
                }
                timestamp = 
convertToMilliseconds(mutationState.getInitialWritePointer());
                return timestamp;
@@ -108,7 +123,7 @@ public class TransactionUtil {
                         if (deleteMarker == null) {
                             deleteMarker = new Put(mutation.getRow());
                         }
-                        
deleteMarker.add(TransactionFactory.getTransactionProvider().newDeleteFamilyMarker(
+                        deleteMarker.add(newDeleteFamilyMarker(
                                 deleteMarker.getRow(), 
                                 family, 
                                 familyCells.get(0).getTimestamp()));
@@ -119,7 +134,7 @@ public class TransactionUtil {
                             if (deleteMarker == null) {
                                 deleteMarker = new Put(mutation.getRow());
                             }
-                            
deleteMarker.add(TransactionFactory.getTransactionProvider().newDeleteColumnMarker(
+                            deleteMarker.add(newDeleteColumnMarker(
                                     deleteMarker.getRow(),
                                     family,
                                     CellUtil.cloneQualifier(cell), 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
index 64045ae..d88a915 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
@@ -63,8 +63,8 @@ import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.EncodedCQCounter;
-import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableRef;
@@ -261,7 +261,7 @@ public class CorrelatePlanTest {
                     PTableType.SUBQUERY, null, 
MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM,
                     null, null, columns, null, null, 
Collections.<PTable>emptyList(),
                     false, Collections.<PName>emptyList(), null, null, false, 
false, false, null,
-                    null, null, true, false, 0, 0L, Boolean.FALSE, null, 
false, ImmutableStorageScheme.ONE_CELL_PER_COLUMN, 
QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, EncodedCQCounter.NULL_COUNTER, 
true);
+                    null, null, true, null, 0, 0L, Boolean.FALSE, null, false, 
ImmutableStorageScheme.ONE_CELL_PER_COLUMN, 
QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, EncodedCQCounter.NULL_COUNTER, 
true);
             TableRef sourceTable = new TableRef(pTable);
             List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> 
newArrayList();
             for (PColumn column : sourceTable.getTable().getColumns()) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/test/java/org/apache/phoenix/execute/LiteralResultIteratorPlanTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/execute/LiteralResultIteratorPlanTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/execute/LiteralResultIteratorPlanTest.java
index 1a7132c..017e6c8 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/execute/LiteralResultIteratorPlanTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/execute/LiteralResultIteratorPlanTest.java
@@ -50,7 +50,6 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.ParseNodeFactory;
 import org.apache.phoenix.parse.SelectStatement;
-import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnImpl;
@@ -58,11 +57,11 @@ import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.EncodedCQCounter;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
 import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
 import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.junit.Test;
@@ -183,7 +182,7 @@ public class LiteralResultIteratorPlanTest {
             PTable pTable = PTableImpl.makePTable(null, PName.EMPTY_NAME, 
PName.EMPTY_NAME, PTableType.SUBQUERY, null,
                     MetaDataProtocol.MIN_TABLE_TIMESTAMP, 
PTable.INITIAL_SEQ_NUM, null, null, columns, null, null,
                     Collections.<PTable> emptyList(), false, 
Collections.<PName> emptyList(), null, null, false, false,
-                    false, null, null, null, true, false, 0, 0L, false, null, 
false, ImmutableStorageScheme.ONE_CELL_PER_COLUMN, 
QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, EncodedCQCounter.NULL_COUNTER, 
true);
+                    false, null, null, null, true, null, 0, 0L, false, null, 
false, ImmutableStorageScheme.ONE_CELL_PER_COLUMN, 
QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, EncodedCQCounter.NULL_COUNTER, 
true);
             TableRef sourceTable = new TableRef(pTable);
             List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> 
newArrayList();
             for (PColumn column : sourceTable.getTable().getColumns()) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index f3674c6..7e0408e 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -130,7 +130,6 @@ import 
org.apache.phoenix.schema.NewerTableAlreadyExistsException;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.schema.TableNotFoundException;
-import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.ConfigUtil;
 import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -168,7 +167,6 @@ public abstract class BaseTest {
     
     private static final Map<String,String> tableDDLMap;
     private static final Logger logger = 
LoggerFactory.getLogger(BaseTest.class);
-    protected static final int DEFAULT_TXN_TIMEOUT_SECONDS = 30;
     @ClassRule
     public static TemporaryFolder tmpFolder = new TemporaryFolder();
     private static final int dropTableTimeout = 300; // 5 mins should be long 
enough.
@@ -414,18 +412,6 @@ public abstract class BaseTest {
         return url;
     }
     
-    private static void tearDownTxManager() throws SQLException {
-        
TransactionFactory.getTransactionProvider().getTransactionContext().tearDownTxManager();
-    }
-
-    protected static void setTxnConfigs() throws IOException {
-        
TransactionFactory.getTransactionProvider().getTransactionContext().setTxnConfigs(config,
 tmpFolder.newFolder().getAbsolutePath(), DEFAULT_TXN_TIMEOUT_SECONDS);
-    }
-
-    protected static void setupTxManager() throws SQLException, IOException {
-        
TransactionFactory.getTransactionProvider().getTransactionContext().setupTxManager(config,
 getUrl());
-    }
-
     private static String checkClusterInitialized(ReadOnlyProps serverProps) 
throws Exception {
         if (!clusterInitialized) {
             url = setUpTestCluster(config, serverProps);
@@ -434,10 +420,6 @@ public abstract class BaseTest {
         return url;
     }
 
-    private static void checkTxManagerInitialized(ReadOnlyProps clientProps) 
throws SQLException, IOException {
-        setupTxManager();
-    }
-
     /**
      * Set up the test hbase cluster.
      * @return url to be used by clients to connect to the cluster.
@@ -476,11 +458,6 @@ public abstract class BaseTest {
         final HBaseTestingUtility u = utility;
         try {
             destroyDriver();
-            try {
-                tearDownTxManager();
-            } catch (Throwable t) {
-                logger.error("Exception caught when shutting down tx manager", 
t);
-            }
             utility = null;
             clusterInitialized = false;
         } finally {
@@ -519,9 +496,7 @@ public abstract class BaseTest {
     
     protected static void setUpTestDriver(ReadOnlyProps serverProps, 
ReadOnlyProps clientProps) throws Exception {
         if (driver == null) {
-            setTxnConfigs();
             String url = checkClusterInitialized(serverProps);
-            checkTxManagerInitialized(serverProps);
             driver = initAndRegisterTestDriver(url, clientProps);
         }
     }
@@ -593,6 +568,7 @@ public abstract class BaseTest {
         conf.set(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 
DEFAULT_RPC_SCHEDULER_FACTORY);
         conf.setLong(HConstants.ZK_SESSION_TIMEOUT, 10 * 
HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
         conf.setLong(HConstants.ZOOKEEPER_TICK_TIME, 6 * 1000);
+        
         // override any defaults based on overrideProps
         for (Entry<String,String> entry : overrideProps) {
             conf.set(entry.getKey(), entry.getValue());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
index c93e56e..a7569f7 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
@@ -20,9 +20,12 @@ package org.apache.phoenix.query;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SPOOL_DIRECTORY;
 import static org.apache.phoenix.query.QueryServicesOptions.withDefaults;
 
+import org.apache.curator.shaded.com.google.common.io.Files;
 import org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.tephra.TxConstants;
+import org.apache.twill.internal.utils.Networks;
 
 
 /**
@@ -69,6 +72,7 @@ public final class QueryServicesTestImpl extends 
BaseQueryServicesImpl {
      * because we want to control it's execution ourselves
      */
     public static final long DEFAULT_INDEX_REBUILD_TASK_INITIAL_DELAY = 
Long.MAX_VALUE;
+    public static final int DEFAULT_TXN_TIMEOUT_SECONDS = 30;
 
     
     /**
@@ -117,7 +121,16 @@ public final class QueryServicesTestImpl extends 
BaseQueryServicesImpl {
                 .setHConnectionPoolMaxSize(DEFAULT_HCONNECTION_POOL_MAX_SIZE)
                 .setMaxThreadsPerHTable(DEFAULT_HTABLE_MAX_THREADS)
                 
.setDefaultIndexPopulationWaitTime(DEFAULT_INDEX_POPULATION_WAIT_TIME)
-                
.setIndexRebuildTaskInitialDelay(DEFAULT_INDEX_REBUILD_TASK_INITIAL_DELAY);
+                
.setIndexRebuildTaskInitialDelay(DEFAULT_INDEX_REBUILD_TASK_INITIAL_DELAY)
+                // setup default configs for Tephra
+                .set(TxConstants.Manager.CFG_DO_PERSIST, false)
+                .set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, 
"n-times")
+                .set(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1)
+                .set(TxConstants.Service.CFG_DATA_TX_BIND_PORT, 
Networks.getRandomPort())
+                .set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, 
Files.createTempDir().getAbsolutePath())
+                .set(TxConstants.Manager.CFG_TX_TIMEOUT, 
DEFAULT_TXN_TIMEOUT_SECONDS)
+                .set(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 5L)
+                ;
     }
     
     public QueryServicesTestImpl(ReadOnlyProps defaultProps, ReadOnlyProps 
overrideProps) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 29d964b..4a56637 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -782,7 +782,7 @@ public class TestUtil {
         ConnectionQueryServices services = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
         MutationState mutationState = pconn.getMutationState();
         if (table.isTransactional()) {
-            mutationState.startTransaction();
+            mutationState.startTransaction(table.getTransactionProvider());
         }
         try (Table htable = mutationState.getHTable(table)) {
             byte[] markerRowKey = Bytes.toBytes("TO_DELETE");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-protocol/src/main/PTable.proto
----------------------------------------------------------------------
diff --git a/phoenix-protocol/src/main/PTable.proto 
b/phoenix-protocol/src/main/PTable.proto
index ba9e0b4..16381dd 100644
--- a/phoenix-protocol/src/main/PTable.proto
+++ b/phoenix-protocol/src/main/PTable.proto
@@ -100,6 +100,7 @@ message PTable {
   optional bytes encodingScheme = 35;
   repeated EncodedCQCounter encodedCQCounters = 36;
   optional bool useStatsForParallelization = 37;
+  optional int32 transactionProvider = 38;
 }
 
 message EncodedCQCounter {

Reply via email to