Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.1 22ceb4167 -> b8c42054e


http://git-wip-us.apache.org/repos/asf/phoenix/blob/b8c42054/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
deleted file mode 100644
index cf48521..0000000
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
+++ /dev/null
@@ -1,330 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.transaction;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Append;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Increment;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.client.RowMutations;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
-import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
-import org.apache.tephra.TxConstants;
-import org.apache.tephra.hbase.TransactionAwareHTable;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.PTableType;
-
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
-
-public class TephraTransactionTable implements PhoenixTransactionalTable {
-
-    private TransactionAwareHTable transactionAwareHTable;
-
-    private TephraTransactionContext tephraTransactionContext;
-
-    public TephraTransactionTable(PhoenixTransactionContext ctx, 
HTableInterface hTable) {
-        this(ctx, hTable, null);
-    }
-
-    public TephraTransactionTable(PhoenixTransactionContext ctx, 
HTableInterface hTable, PTable pTable) {
-
-        assert(ctx instanceof TephraTransactionContext);
-
-        tephraTransactionContext = (TephraTransactionContext) ctx;
-
-        transactionAwareHTable = new TransactionAwareHTable(hTable, (pTable != 
null && pTable.isImmutableRows()) ? TxConstants.ConflictDetection.NONE : 
TxConstants.ConflictDetection.ROW);
-
-        tephraTransactionContext.addTransactionAware(transactionAwareHTable);
-
-        if (pTable != null && pTable.getType() != PTableType.INDEX) {
-            tephraTransactionContext.markDMLFence(pTable);
-        }
-    }
-
-    @Override
-    public Result get(Get get) throws IOException {
-        return transactionAwareHTable.get(get);
-    }
-
-    @Override
-    public void put(Put put) throws IOException {
-        transactionAwareHTable.put(put);
-    }
-
-    @Override
-    public void delete(Delete delete) throws IOException {
-        transactionAwareHTable.delete(delete);
-    }
-
-    @Override
-    public ResultScanner getScanner(Scan scan) throws IOException {
-        return transactionAwareHTable.getScanner(scan);
-    }
-
-    @Override
-    public byte[] getTableName() {
-        return transactionAwareHTable.getTableName();
-    }
-
-    @Override
-    public Configuration getConfiguration() {
-        return transactionAwareHTable.getConfiguration();
-    }
-
-    @Override
-    public HTableDescriptor getTableDescriptor() throws IOException {
-        return transactionAwareHTable.getTableDescriptor();
-    }
-
-    @Override
-    public boolean exists(Get get) throws IOException {
-        return transactionAwareHTable.exists(get);
-    }
-
-    @Override
-    public Result[] get(List<Get> gets) throws IOException {
-        return transactionAwareHTable.get(gets);
-    }
-
-    @Override
-    public ResultScanner getScanner(byte[] family) throws IOException {
-        return transactionAwareHTable.getScanner(family);
-    }
-
-    @Override
-    public ResultScanner getScanner(byte[] family, byte[] qualifier)
-            throws IOException {
-        return transactionAwareHTable.getScanner(family, qualifier);
-    }
-
-    @Override
-    public void put(List<Put> puts) throws IOException {
-        transactionAwareHTable.put(puts);
-    }
-
-    @Override
-    public void delete(List<Delete> deletes) throws IOException {
-        transactionAwareHTable.delete(deletes);
-    }
-
-    @Override
-    public void setAutoFlush(boolean autoFlush) {
-        transactionAwareHTable.setAutoFlush(autoFlush);
-    }
-
-    @Override
-    public boolean isAutoFlush() {
-        return transactionAwareHTable.isAutoFlush();
-    }
-
-    @Override
-    public long getWriteBufferSize() {
-        return transactionAwareHTable.getWriteBufferSize();
-    }
-
-    @Override
-    public void setWriteBufferSize(long writeBufferSize) throws IOException {
-        transactionAwareHTable.setWriteBufferSize(writeBufferSize);
-    }
-
-    @Override
-    public void flushCommits() throws IOException {
-        transactionAwareHTable.flushCommits();
-    }
-
-    @Override
-    public void close() throws IOException {
-        transactionAwareHTable.close();
-    }
-
-    @Override
-    public long incrementColumnValue(byte[] row, byte[] family,
-            byte[] qualifier, long amount, boolean writeToWAL)
-            throws IOException {
-        return transactionAwareHTable.incrementColumnValue(row, family, 
qualifier, amount, writeToWAL);
-    }
-
-    @Override
-    public Boolean[] exists(List<Get> gets) throws IOException {
-        return transactionAwareHTable.exists(gets);
-    }
-
-    @Override
-    public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
-        transactionAwareHTable.setAutoFlush(autoFlush, clearBufferOnFail);
-    }
-
-    @Override
-    public void setAutoFlushTo(boolean autoFlush) {
-        transactionAwareHTable.setAutoFlush(autoFlush);
-    }
-
-    @Override
-    public Result getRowOrBefore(byte[] row, byte[] family) throws IOException 
{
-        return transactionAwareHTable.getRowOrBefore(row, family);
-    }
-
-    @Override
-    public TableName getName() {
-        return transactionAwareHTable.getName();
-    }
-
-    @Override
-    public boolean[] existsAll(List<Get> gets) throws IOException {
-        return transactionAwareHTable.existsAll(gets);
-    }
-
-    @Override
-    public void batch(List<? extends Row> actions, Object[] results)
-            throws IOException, InterruptedException {
-        transactionAwareHTable.batch(actions, results);
-    }
-
-    @Override
-    public Object[] batch(List<? extends Row> actions) throws IOException,
-            InterruptedException {
-        return transactionAwareHTable.batch(actions);
-    }
-
-    @Override
-    public <R> void batchCallback(List<? extends Row> actions,
-            Object[] results, Callback<R> callback) throws IOException,
-            InterruptedException {
-        transactionAwareHTable.batchCallback(actions, results, callback);
-    }
-
-    @Override
-    public <R> Object[] batchCallback(List<? extends Row> actions,
-            Callback<R> callback) throws IOException, InterruptedException {
-        return transactionAwareHTable.batchCallback(actions, callback);
-    }
-
-    @Override
-    public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
-            byte[] value, Put put) throws IOException {
-        return transactionAwareHTable.checkAndPut(row, family, qualifier, 
value, put);
-    }
-
-    @Override
-    public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
-            CompareOp compareOp, byte[] value, Put put) throws IOException {
-        return transactionAwareHTable.checkAndPut(row, family, qualifier, 
compareOp, value, put);
-    }
-
-    @Override
-    public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
-            byte[] value, Delete delete) throws IOException {
-        return transactionAwareHTable.checkAndDelete(row, family, qualifier, 
value, delete);
-    }
-
-    @Override
-    public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
-            CompareOp compareOp, byte[] value, Delete delete)
-            throws IOException {
-        return transactionAwareHTable.checkAndDelete(row, family, qualifier, 
compareOp, value, delete);
-    }
-
-    @Override
-    public void mutateRow(RowMutations rm) throws IOException {
-        transactionAwareHTable.mutateRow(rm);
-    }
-
-    @Override
-    public Result append(Append append) throws IOException {
-        return transactionAwareHTable.append(append);
-    }
-
-    @Override
-    public Result increment(Increment increment) throws IOException {
-        return transactionAwareHTable.increment(increment);
-    }
-
-    @Override
-    public long incrementColumnValue(byte[] row, byte[] family,
-            byte[] qualifier, long amount) throws IOException {
-        return transactionAwareHTable.incrementColumnValue(row, family, 
qualifier, amount);
-    }
-
-    @Override
-    public long incrementColumnValue(byte[] row, byte[] family,
-            byte[] qualifier, long amount, Durability durability)
-            throws IOException {
-        return transactionAwareHTable.incrementColumnValue(row, family, 
qualifier, amount, durability);
-    }
-
-    @Override
-    public CoprocessorRpcChannel coprocessorService(byte[] row) {
-        return transactionAwareHTable.coprocessorService(row);
-    }
-
-    @Override
-    public <T extends Service, R> Map<byte[], R> coprocessorService(
-            Class<T> service, byte[] startKey, byte[] endKey,
-            Call<T, R> callable) throws ServiceException, Throwable {
-        return transactionAwareHTable.coprocessorService(service, startKey, 
endKey, callable);
-    }
-
-    @Override
-    public <T extends Service, R> void coprocessorService(Class<T> service,
-            byte[] startKey, byte[] endKey, Call<T, R> callable,
-            Callback<R> callback) throws ServiceException, Throwable {
-        transactionAwareHTable.coprocessorService(service, startKey, endKey, 
callable, callback);
-    }
-
-    @Override
-    public <R extends Message> Map<byte[], R> batchCoprocessorService(
-            MethodDescriptor methodDescriptor, Message request,
-            byte[] startKey, byte[] endKey, R responsePrototype)
-            throws ServiceException, Throwable {
-        return 
transactionAwareHTable.batchCoprocessorService(methodDescriptor, request, 
startKey, endKey, responsePrototype);
-    }
-
-    @Override
-    public <R extends Message> void batchCoprocessorService(
-            MethodDescriptor methodDescriptor, Message request,
-            byte[] startKey, byte[] endKey, R responsePrototype,
-            Callback<R> callback) throws ServiceException, Throwable {
-        transactionAwareHTable.batchCoprocessorService(methodDescriptor, 
request, startKey, endKey, responsePrototype, callback);
-    }
-
-    @Override
-    public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
-            CompareOp compareOp, byte[] value, RowMutations mutation)
-            throws IOException {
-        return transactionAwareHTable.checkAndMutate(row, family, qualifier, 
compareOp, value, mutation);
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b8c42054/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
index f32764b..62bd808 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
@@ -17,24 +17,55 @@
  */
 package org.apache.phoenix.transaction;
 
+import java.io.IOException;
+
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+
+
 public class TransactionFactory {
-    enum TransactionProcessor {
-        Tephra,
-        Omid
+    public enum Provider {
+        TEPHRA((byte)1, TephraTransactionProvider.getInstance()),
+        OMID((byte)2, OmidTransactionProvider.getInstance());
+        
+        private final byte code;
+        private final PhoenixTransactionProvider provider;
+        
+        Provider(byte code, PhoenixTransactionProvider provider) {
+            this.code = code;
+            this.provider = provider;
+        }
+        
+        public byte getCode() {
+            return this.code;
+        }
+
+        public static Provider fromCode(int code) {
+            if (code < 1 || code > Provider.values().length) {
+                throw new IllegalArgumentException("Invalid 
TransactionFactory.Provider " + code);
+            }
+            return Provider.values()[code-1];
+        }
+        
+        public static Provider getDefault() {
+            return TEPHRA;
+        }
+
+        public PhoenixTransactionProvider getTransactionProvider()  {
+            return provider;
+        }
     }
 
-    static public TransactionProvider getTransactionProvider() {
-        return TephraTransactionProvider.getInstance();
+    public static PhoenixTransactionProvider getTransactionProvider(Provider 
provider) {
+        return provider.getTransactionProvider();
     }
     
-    static public TransactionProvider 
getTransactionProvider(TransactionProcessor processor) {
-        switch (processor) {
-        case Tephra:
-            return TephraTransactionProvider.getInstance();
-        case Omid:
-            return OmidTransactionProvider.getInstance();
-        default:
-            throw new IllegalArgumentException("Unknown transaction processor: 
" + processor);
+    public static PhoenixTransactionContext getTransactionContext(byte[] 
txState, int clientVersion) throws IOException {
+        if (txState == null || txState.length == 0) {
+            return null;
         }
+        Provider provider = (clientVersion < 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0) 
+                ? Provider.OMID
+                : Provider.fromCode(txState[txState.length-1]);
+        return 
provider.getTransactionProvider().getTransactionContext(txState);
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b8c42054/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionProvider.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionProvider.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionProvider.java
deleted file mode 100644
index a5704f1..0000000
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionProvider.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.transaction;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-
-public interface TransactionProvider {
-    public PhoenixTransactionContext getTransactionContext();
-    public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) 
throws IOException;
-    public PhoenixTransactionContext getTransactionContext(PhoenixConnection 
connection);
-    public PhoenixTransactionContext 
getTransactionContext(PhoenixTransactionContext contex, PhoenixConnection 
connection, boolean subTask);
-    
-    public PhoenixTransactionalTable 
getTransactionalTable(PhoenixTransactionContext ctx, HTableInterface htable);
-    
-    public Cell newDeleteFamilyMarker(byte[] row, byte[] family, long 
timestamp);
-    public Cell newDeleteColumnMarker(byte[] row, byte[] family, byte[] 
qualifier, long timestamp);
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b8c42054/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index 1c25c33..6cf6e56 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -88,7 +88,6 @@ import org.apache.phoenix.schema.RowKeyValueAccessor;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.ValueBitSet;
 import org.apache.phoenix.schema.types.PDataType;
-import org.apache.phoenix.transaction.TransactionFactory;
 
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
@@ -1515,7 +1514,7 @@ public class PhoenixRuntime {
      * @return wall clock time in milliseconds (i.e. Epoch time) of a given 
Cell time stamp.
      */
     public static long getWallClockTimeFromCellTimeStamp(long tsOfCell) {
-        return 
TransactionFactory.getTransactionProvider().getTransactionContext().isPreExistingVersion(tsOfCell)
 ? tsOfCell : TransactionUtil.convertToMilliseconds(tsOfCell);
+        return TransactionUtil.isTransactionalTimestamp(tsOfCell) ? 
TransactionUtil.convertToMilliseconds(tsOfCell) : tsOfCell;
     }
 
     public static long getCurrentScn(ReadOnlyProps props) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b8c42054/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index dd885fd..996e1dc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -59,6 +59,7 @@ import org.apache.phoenix.filter.DistinctPrefixFilter;
 import org.apache.phoenix.filter.MultiEncodedCQKeyValueComparisonFilter;
 import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.VersionUtil;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.KeyRange.Bound;
@@ -88,6 +89,8 @@ import com.google.common.collect.Lists;
  */
 public class ScanUtil {
     public static final int[] SINGLE_COLUMN_SLOT_SPAN = new int[1];
+    public static final int UNKNOWN_CLIENT_VERSION = 
VersionUtil.encodeVersion(4, 4, 0);
+
     /*
      * Max length that we fill our key when we turn an inclusive key
      * into a exclusive key.
@@ -930,5 +933,17 @@ public class ScanUtil {
     public static boolean isIndexRebuild(Scan scan) {
         return scan.getAttribute((BaseScannerRegionObserver.REBUILD_INDEXES)) 
!= null;
     }
+ 
+    public static int getClientVersion(Scan scan) {
+        int clientVersion = UNKNOWN_CLIENT_VERSION;
+        byte[] clientVersionBytes = 
scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
+        if (clientVersionBytes != null) {
+            clientVersion = Bytes.toInt(clientVersionBytes);
+        }
+        return clientVersion;
+    }
     
+    public static void setClientVersion(Scan scan, int version) {
+        scan.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, 
Bytes.toBytes(version));
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b8c42054/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
index ab76ffe..7d6dfd4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
@@ -17,44 +17,65 @@
  */
 package org.apache.phoenix.util;
 
+import java.io.IOException;
 import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.transaction.PhoenixTransactionContext;
-import org.apache.phoenix.transaction.PhoenixTransactionalTable;
-import org.apache.phoenix.transaction.TephraTransactionTable;
 import org.apache.phoenix.transaction.TransactionFactory;
-import org.apache.tephra.util.TxUtils;
 
 public class TransactionUtil {
+    // All transaction providers must use an empty byte array as the family 
delete marker
+    // (see TxConstants.FAMILY_DELETE_QUALIFIER)
+    public static final byte[] FAMILY_DELETE_MARKER = 
HConstants.EMPTY_BYTE_ARRAY;
+    // All transaction providers must multiply timestamps by this constant.
+    // (see TxConstants.MAX_TX_PER_MS)
+    public static final int MAX_TRANSACTIONS_PER_MILLISECOND = 1000000;
+    // Constant used to empirically determine if a timestamp is a 
transactional or
+    // non transactional timestamp (see TxUtils.MAX_NON_TX_TIMESTAMP)
+    private static final long MAX_NON_TX_TIMESTAMP = (long) 
(System.currentTimeMillis() * 1.1);
+    
     private TransactionUtil() {
+        
     }
     
     public static boolean isTransactionalTimestamp(long ts) {
-        return !TxUtils.isPreExistingVersion(ts);
+        return ts >= MAX_NON_TX_TIMESTAMP;
     }
     
     public static boolean isDelete(Cell cell) {
-        return (CellUtil.matchingValue(cell, HConstants.EMPTY_BYTE_ARRAY));
+        return CellUtil.matchingValue(cell, HConstants.EMPTY_BYTE_ARRAY);
     }
     
-    public static long convertToNanoseconds(long serverTimeStamp) {
-        return serverTimeStamp * 
TransactionFactory.getTransactionProvider().getTransactionContext().getMaxTransactionsPerSecond();
+    public static boolean isDeleteFamily(Cell cell) {
+        return CellUtil.matchingQualifier(cell, FAMILY_DELETE_MARKER) && 
CellUtil.matchingValue(cell, HConstants.EMPTY_BYTE_ARRAY);
     }
     
-    public static long convertToMilliseconds(long serverTimeStamp) {
-        return serverTimeStamp / 
TransactionFactory.getTransactionProvider().getTransactionContext().getMaxTransactionsPerSecond();
+    private static Cell newDeleteFamilyMarker(byte[] row, byte[] family, long 
timestamp) {
+        return CellUtil.createCell(row, family, FAMILY_DELETE_MARKER, 
timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY);
+    }
+    
+    private static Cell newDeleteColumnMarker(byte[] row, byte[] family, 
byte[] qualifier, long timestamp) {
+        return CellUtil.createCell(row, family, qualifier, timestamp, 
KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY);
+    }
+
+    public static long convertToNanoseconds(long serverTimeStamp) {
+        return serverTimeStamp * MAX_TRANSACTIONS_PER_MILLISECOND;
     }
     
-    public static PhoenixTransactionalTable 
getPhoenixTransactionTable(PhoenixTransactionContext phoenixTransactionContext, 
HTableInterface htable, PTable pTable) {
-        return new TephraTransactionTable(phoenixTransactionContext, htable, 
pTable);
+    public static long convertToMilliseconds(long serverTimeStamp) {
+        return serverTimeStamp / MAX_TRANSACTIONS_PER_MILLISECOND;
     }
     
     // we resolve transactional tables at the txn read pointer
@@ -77,16 +98,58 @@ public class TransactionUtil {
                return  txInProgress ? 
convertToMilliseconds(mutationState.getInitialWritePointer()) : 
result.getMutationTime();
        }
 
-       public static Long getTableTimestamp(PhoenixConnection connection, 
boolean transactional) throws SQLException {
+       public static Long getTableTimestamp(PhoenixConnection connection, 
boolean transactional, TransactionFactory.Provider provider) throws 
SQLException {
                Long timestamp = null;
                if (!transactional) {
                        return timestamp;
                }
                MutationState mutationState = connection.getMutationState();
                if (!mutationState.isTransactionStarted()) {
-                       mutationState.startTransaction();
+                       mutationState.startTransaction(provider);
                }
                timestamp = 
convertToMilliseconds(mutationState.getInitialWritePointer());
                return timestamp;
        }
+       
+    // Convert HBase Delete into Put so that it can be undone if transaction 
is rolled back
+    public static Mutation convertIfDelete(Mutation mutation) throws 
IOException {
+        if (mutation instanceof Delete) {
+            Put deleteMarker = null;
+            for (Map.Entry<byte[],List<Cell>> entry : 
mutation.getFamilyCellMap().entrySet()) {
+                byte[] family = entry.getKey();
+                List<Cell> familyCells = entry.getValue();
+                if (familyCells.size() == 1) {
+                    if (CellUtil.isDeleteFamily(familyCells.get(0))) {
+                        if (deleteMarker == null) {
+                            deleteMarker = new Put(mutation.getRow());
+                        }
+                        deleteMarker.add(newDeleteFamilyMarker(
+                                deleteMarker.getRow(), 
+                                family, 
+                                familyCells.get(0).getTimestamp()));
+                    }
+                } else {
+                    for (Cell cell : familyCells) {
+                        if (CellUtil.isDeleteColumns(cell)) {
+                            if (deleteMarker == null) {
+                                deleteMarker = new Put(mutation.getRow());
+                            }
+                            deleteMarker.add(newDeleteColumnMarker(
+                                    deleteMarker.getRow(),
+                                    family,
+                                    CellUtil.cloneQualifier(cell), 
+                                    cell.getTimestamp()));
+                        }
+                    }
+                }
+            }
+            if (deleteMarker != null) {
+                for (Map.Entry<String, byte[]> entry : 
mutation.getAttributesMap().entrySet()) {
+                    deleteMarker.setAttribute(entry.getKey(), 
entry.getValue());
+                }
+                mutation = deleteMarker;
+            }
+        }
+        return mutation;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b8c42054/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
index 76757b0..d88a915 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
@@ -56,7 +56,6 @@ import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.JoinTableNode.JoinType;
 import org.apache.phoenix.parse.ParseNodeFactory;
 import org.apache.phoenix.parse.SelectStatement;
-import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnImpl;
@@ -64,8 +63,8 @@ import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.EncodedCQCounter;
-import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableRef;
@@ -262,7 +261,7 @@ public class CorrelatePlanTest {
                     PTableType.SUBQUERY, null, 
MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM,
                     null, null, columns, null, null, 
Collections.<PTable>emptyList(),
                     false, Collections.<PName>emptyList(), null, null, false, 
false, false, null,
-                    null, null, true, false, 0, 0L, Boolean.FALSE, null, 
false, ImmutableStorageScheme.ONE_CELL_PER_COLUMN, 
QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, EncodedCQCounter.NULL_COUNTER, 
true);
+                    null, null, true, null, 0, 0L, Boolean.FALSE, null, false, 
ImmutableStorageScheme.ONE_CELL_PER_COLUMN, 
QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, EncodedCQCounter.NULL_COUNTER, 
true);
             TableRef sourceTable = new TableRef(pTable);
             List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> 
newArrayList();
             for (PColumn column : sourceTable.getTable().getColumns()) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b8c42054/phoenix-core/src/test/java/org/apache/phoenix/execute/LiteralResultIteratorPlanTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/execute/LiteralResultIteratorPlanTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/execute/LiteralResultIteratorPlanTest.java
index 1a7132c..017e6c8 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/execute/LiteralResultIteratorPlanTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/execute/LiteralResultIteratorPlanTest.java
@@ -50,7 +50,6 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.ParseNodeFactory;
 import org.apache.phoenix.parse.SelectStatement;
-import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnImpl;
@@ -58,11 +57,11 @@ import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.EncodedCQCounter;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
 import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
 import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.junit.Test;
@@ -183,7 +182,7 @@ public class LiteralResultIteratorPlanTest {
             PTable pTable = PTableImpl.makePTable(null, PName.EMPTY_NAME, 
PName.EMPTY_NAME, PTableType.SUBQUERY, null,
                     MetaDataProtocol.MIN_TABLE_TIMESTAMP, 
PTable.INITIAL_SEQ_NUM, null, null, columns, null, null,
                     Collections.<PTable> emptyList(), false, 
Collections.<PName> emptyList(), null, null, false, false,
-                    false, null, null, null, true, false, 0, 0L, false, null, 
false, ImmutableStorageScheme.ONE_CELL_PER_COLUMN, 
QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, EncodedCQCounter.NULL_COUNTER, 
true);
+                    false, null, null, null, true, null, 0, 0L, false, null, 
false, ImmutableStorageScheme.ONE_CELL_PER_COLUMN, 
QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, EncodedCQCounter.NULL_COUNTER, 
true);
             TableRef sourceTable = new TableRef(pTable);
             List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> 
newArrayList();
             for (PColumn column : sourceTable.getTable().getColumns()) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b8c42054/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 18b5bf8..0ea63e7 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -130,7 +130,6 @@ import 
org.apache.phoenix.schema.NewerTableAlreadyExistsException;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.schema.TableNotFoundException;
-import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.ConfigUtil;
 import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -168,7 +167,6 @@ public abstract class BaseTest {
     
     private static final Map<String,String> tableDDLMap;
     private static final Logger logger = 
LoggerFactory.getLogger(BaseTest.class);
-    protected static final int DEFAULT_TXN_TIMEOUT_SECONDS = 30;
     @ClassRule
     public static TemporaryFolder tmpFolder = new TemporaryFolder();
     private static final int dropTableTimeout = 300; // 5 mins should be long 
enough.
@@ -414,18 +412,6 @@ public abstract class BaseTest {
         return url;
     }
     
-    private static void tearDownTxManager() throws SQLException {
-        
TransactionFactory.getTransactionProvider().getTransactionContext().tearDownTxManager();
-    }
-
-    protected static void setTxnConfigs() throws IOException {
-        
TransactionFactory.getTransactionProvider().getTransactionContext().setTxnConfigs(config,
 tmpFolder.newFolder().getAbsolutePath(), DEFAULT_TXN_TIMEOUT_SECONDS);
-    }
-
-    protected static void setupTxManager() throws SQLException, IOException {
-        
TransactionFactory.getTransactionProvider().getTransactionContext().setupTxManager(config,
 getUrl());
-    }
-
     private static String checkClusterInitialized(ReadOnlyProps serverProps) 
throws Exception {
         if (!clusterInitialized) {
             url = setUpTestCluster(config, serverProps);
@@ -434,10 +420,6 @@ public abstract class BaseTest {
         return url;
     }
 
-    private static void checkTxManagerInitialized(ReadOnlyProps clientProps) 
throws SQLException, IOException {
-        setupTxManager();
-    }
-
     /**
      * Set up the test hbase cluster.
      * @return url to be used by clients to connect to the cluster.
@@ -476,11 +458,6 @@ public abstract class BaseTest {
         final HBaseTestingUtility u = utility;
         try {
             destroyDriver();
-            try {
-                tearDownTxManager();
-            } catch (Throwable t) {
-                logger.error("Exception caught when shutting down tx manager", 
t);
-            }
             utility = null;
             clusterInitialized = false;
         } finally {
@@ -519,9 +496,7 @@ public abstract class BaseTest {
     
     protected static void setUpTestDriver(ReadOnlyProps serverProps, 
ReadOnlyProps clientProps) throws Exception {
         if (driver == null) {
-            setTxnConfigs();
             String url = checkClusterInitialized(serverProps);
-            checkTxManagerInitialized(serverProps);
             driver = initAndRegisterTestDriver(url, clientProps);
         }
     }
@@ -593,7 +568,7 @@ public abstract class BaseTest {
         conf.set(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 
DEFAULT_RPC_SCHEDULER_FACTORY);
         conf.setLong(HConstants.ZK_SESSION_TIMEOUT, 10 * 
HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
         conf.setLong(HConstants.ZOOKEEPER_TICK_TIME, 6 * 1000);
-
+        
         // override any defaults based on overrideProps
         for (Entry<String,String> entry : overrideProps) {
             conf.set(entry.getKey(), entry.getValue());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b8c42054/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
index c93e56e..a7569f7 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
@@ -20,9 +20,12 @@ package org.apache.phoenix.query;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SPOOL_DIRECTORY;
 import static org.apache.phoenix.query.QueryServicesOptions.withDefaults;
 
+import org.apache.curator.shaded.com.google.common.io.Files;
 import org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.tephra.TxConstants;
+import org.apache.twill.internal.utils.Networks;
 
 
 /**
@@ -69,6 +72,7 @@ public final class QueryServicesTestImpl extends 
BaseQueryServicesImpl {
      * because we want to control it's execution ourselves
      */
     public static final long DEFAULT_INDEX_REBUILD_TASK_INITIAL_DELAY = 
Long.MAX_VALUE;
+    public static final int DEFAULT_TXN_TIMEOUT_SECONDS = 30;
 
     
     /**
@@ -117,7 +121,16 @@ public final class QueryServicesTestImpl extends 
BaseQueryServicesImpl {
                 .setHConnectionPoolMaxSize(DEFAULT_HCONNECTION_POOL_MAX_SIZE)
                 .setMaxThreadsPerHTable(DEFAULT_HTABLE_MAX_THREADS)
                 
.setDefaultIndexPopulationWaitTime(DEFAULT_INDEX_POPULATION_WAIT_TIME)
-                
.setIndexRebuildTaskInitialDelay(DEFAULT_INDEX_REBUILD_TASK_INITIAL_DELAY);
+                
.setIndexRebuildTaskInitialDelay(DEFAULT_INDEX_REBUILD_TASK_INITIAL_DELAY)
+                // setup default configs for Tephra
+                .set(TxConstants.Manager.CFG_DO_PERSIST, false)
+                .set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, 
"n-times")
+                .set(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1)
+                .set(TxConstants.Service.CFG_DATA_TX_BIND_PORT, 
Networks.getRandomPort())
+                .set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, 
Files.createTempDir().getAbsolutePath())
+                .set(TxConstants.Manager.CFG_TX_TIMEOUT, 
DEFAULT_TXN_TIMEOUT_SECONDS)
+                .set(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 5L)
+                ;
     }
     
     public QueryServicesTestImpl(ReadOnlyProps defaultProps, ReadOnlyProps 
overrideProps) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b8c42054/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 1ec07b6..a06fd69 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -780,7 +780,7 @@ public class TestUtil {
         ConnectionQueryServices services = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
         MutationState mutationState = pconn.getMutationState();
         if (table.isTransactional()) {
-            mutationState.startTransaction();
+            mutationState.startTransaction(table.getTransactionProvider());
         }
         try (HTableInterface htable = mutationState.getHTable(table)) {
             byte[] markerRowKey = Bytes.toBytes("TO_DELETE");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b8c42054/phoenix-protocol/src/main/PTable.proto
----------------------------------------------------------------------
diff --git a/phoenix-protocol/src/main/PTable.proto 
b/phoenix-protocol/src/main/PTable.proto
index ba9e0b4..16381dd 100644
--- a/phoenix-protocol/src/main/PTable.proto
+++ b/phoenix-protocol/src/main/PTable.proto
@@ -100,6 +100,7 @@ message PTable {
   optional bytes encodingScheme = 35;
   repeated EncodedCQCounter encodedCQCounters = 36;
   optional bool useStatsForParallelization = 37;
+  optional int32 transactionProvider = 38;
 }
 
 message EncodedCQCounter {

Reply via email to