PHOENIX-2411 Allow Phoenix to participate as transactional component

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/cd4f4547
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/cd4f4547
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/cd4f4547

Branch: refs/heads/4.x-HBase-1.0
Commit: cd4f4547b77d3da9816fc9cbfad59298a04211ca
Parents: 16e5c7b
Author: James Taylor <[email protected]>
Authored: Sat Dec 26 23:56:59 2015 -0800
Committer: James Taylor <[email protected]>
Committed: Wed Dec 30 18:14:20 2015 -0800

----------------------------------------------------------------------
 .../org/apache/phoenix/tx/TransactionIT.java    | 136 ++++++++++++++++++-
 .../phoenix/exception/SQLExceptionCode.java     |   2 +
 .../apache/phoenix/execute/MutationState.java   |   9 +-
 .../apache/phoenix/jdbc/PhoenixConnection.java  |  58 ++++++--
 .../apache/phoenix/jdbc/PhoenixStatement.java   |  13 +-
 .../org/apache/phoenix/query/QueryServices.java |   9 +-
 .../phoenix/query/QueryServicesOptions.java     |   6 +-
 .../org/apache/phoenix/util/PhoenixRuntime.java |  62 ++++++---
 8 files changed, 249 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd4f4547/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
index 929e4a5..e83467a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
@@ -20,8 +20,6 @@ package org.apache.phoenix.tx;
 import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.apache.phoenix.util.TestUtil.TRANSACTIONAL_DATA_TABLE;
-import static org.apache.phoenix.util.TestUtil.analyzeTable;
-import static org.apache.phoenix.util.TestUtil.getAllSplits;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -41,22 +39,22 @@ import java.util.Properties;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
 import org.apache.phoenix.end2end.Shadower;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableKey;
-import org.apache.phoenix.schema.ReadOnlyTableException;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.PropertiesUtil;
@@ -66,12 +64,15 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.hbase10.coprocessor.TransactionProcessor;
-
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import co.cask.tephra.TransactionContext;
+import co.cask.tephra.TransactionSystemClient;
+import co.cask.tephra.TxConstants;
+import co.cask.tephra.hbase10.TransactionAwareHTable;
+import co.cask.tephra.hbase10.coprocessor.TransactionProcessor;
+
 public class TransactionIT extends BaseHBaseManagedTimeIT {
        
        private static final String FULL_TABLE_NAME = INDEX_DATA_SCHEMA + 
QueryConstants.NAME_SEPARATOR + TRANSACTIONAL_DATA_TABLE;
@@ -603,4 +604,125 @@ public class TransactionIT extends BaseHBaseManagedTimeIT 
{
         }
         assertEquals(5, count);
     }
+    
+    @Test
+    public void testExternalTxContext() throws Exception {
+        ResultSet rs;
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+        PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
+        
+        TransactionSystemClient txServiceClient = 
pconn.getQueryServices().getTransactionSystemClient();
+
+        String fullTableName = "T";
+        Statement stmt = conn.createStatement();
+        stmt.execute("CREATE TABLE " + fullTableName + "(K VARCHAR PRIMARY 
KEY, V1 VARCHAR, V2 VARCHAR) TRANSACTIONAL=true");
+        HTableInterface htable = 
pconn.getQueryServices().getTable(Bytes.toBytes(fullTableName));
+        stmt.executeUpdate("upsert into " + fullTableName + " values('x', 'a', 
'a')");
+        conn.commit();
+
+        try (Connection newConn = DriverManager.getConnection(getUrl(), 
props)) {
+            rs = newConn.createStatement().executeQuery("select count(*) from 
" + fullTableName);
+            assertTrue(rs.next());
+            assertEquals(1,rs.getInt(1));
+        }
+
+        // Use HBase level Tephra APIs to start a new transaction
+        TransactionAwareHTable txAware = new TransactionAwareHTable(htable, 
TxConstants.ConflictDetection.ROW);
+        TransactionContext txContext = new TransactionContext(txServiceClient, 
txAware);
+        txContext.start();
+        
+        // Use HBase APIs to add a new row
+        Put put = new Put(Bytes.toBytes("z"));
+        put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+        put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("V1"), Bytes.toBytes("b"));
+        txAware.put(put);
+        
+        // Use Phoenix APIs to add new row (sharing the transaction context)
+        pconn.setTransactionContext(txContext);
+        conn.createStatement().executeUpdate("upsert into " + fullTableName + 
" values('y', 'c', 'c')");
+
+        // New connection should not see data as it hasn't been committed yet
+        try (Connection newConn = DriverManager.getConnection(getUrl(), 
props)) {
+            rs = newConn.createStatement().executeQuery("select count(*) from 
" + fullTableName);
+            assertTrue(rs.next());
+            assertEquals(1,rs.getInt(1));
+        }
+        
+        // Use new connection to create a row with a conflict
+        Connection connWithConflict = DriverManager.getConnection(getUrl(), 
props);
+        connWithConflict.createStatement().execute("upsert into " + 
fullTableName + " values('z', 'd', 'd')");
+        
+        // Existing connection should see data even though it hasn't been 
committed yet
+        rs = conn.createStatement().executeQuery("select count(*) from " + 
fullTableName);
+        assertTrue(rs.next());
+        assertEquals(3,rs.getInt(1));
+        
+        // Use Tephra APIs directly to finish (i.e. commit) the transaction
+        txContext.finish();
+        
+        // Confirm that attempt to commit row with conflict fails
+        try {
+            connWithConflict.commit();
+            fail();
+        } catch (SQLException e) {
+            
assertEquals(SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode(), 
e.getErrorCode());
+        } finally {
+            connWithConflict.close();
+        }
+        
+        // New connection should now see data as it has been committed
+        try (Connection newConn = DriverManager.getConnection(getUrl(), 
props)) {
+            rs = newConn.createStatement().executeQuery("select count(*) from 
" + fullTableName);
+            assertTrue(rs.next());
+            assertEquals(3,rs.getInt(1));
+        }
+        
+        // Repeat the same as above, but this time abort the transaction
+        txContext = new TransactionContext(txServiceClient, txAware);
+        txContext.start();
+        
+        // Use HBase APIs to add a new row
+        put = new Put(Bytes.toBytes("j"));
+        put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+        put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("V1"), Bytes.toBytes("e"));
+        txAware.put(put);
+        
+        // Use Phoenix APIs to add new row (sharing the transaction context)
+        pconn.setTransactionContext(txContext);
+        conn.createStatement().executeUpdate("upsert into " + fullTableName + 
" values('k', 'f', 'f')");
+        
+        // Existing connection should see data even though it hasn't been 
committed yet
+        rs = conn.createStatement().executeQuery("select count(*) from " + 
fullTableName);
+        assertTrue(rs.next());
+        assertEquals(5,rs.getInt(1));
+
+        connWithConflict.createStatement().execute("upsert into " + 
fullTableName + " values('k', 'g', 'g')");
+        rs = connWithConflict.createStatement().executeQuery("select count(*) 
from " + fullTableName);
+        assertTrue(rs.next());
+        assertEquals(4,rs.getInt(1));
+
+        // Use Tephra APIs directly to abort (i.e. rollback) the transaction
+        txContext.abort();
+        
+        rs = conn.createStatement().executeQuery("select count(*) from " + 
fullTableName);
+        assertTrue(rs.next());
+        assertEquals(3,rs.getInt(1));
+
+        // Should succeed since conflicting row was aborted
+        connWithConflict.commit();
+
+        // New connection should now see data as it has been committed
+        try (Connection newConn = DriverManager.getConnection(getUrl(), 
props)) {
+            rs = newConn.createStatement().executeQuery("select count(*) from 
" + fullTableName);
+            assertTrue(rs.next());
+            assertEquals(4,rs.getInt(1));
+        }
+        
+        // Even using HBase APIs directly, we shouldn't find 'j' since a 
delete marker would have been
+        // written to hide it.
+        Result result = htable.get(new Get(Bytes.toBytes("j")));
+        assertTrue(result.isEmpty());
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd4f4547/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index ef135eb..36036ae 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -280,6 +280,8 @@ public enum SQLExceptionCode {
     CANNOT_ALTER_TO_BE_TXN_IF_TXNS_DISABLED(1079, "44A10", "Cannot alter table 
to be transactional table if transactions are disabled"),
     CANNOT_CREATE_TXN_TABLE_WITH_ROW_TIMESTAMP(1080, "44A11", "Cannot create a 
transactional table if transactions are disabled"),
     CANNOT_ALTER_TO_BE_TXN_WITH_ROW_TIMESTAMP(1081, "44A12", "Cannot alter 
table to be transactional table if transactions are disabled"),
+    TX_MUST_BE_ENABLED_TO_SET_TX_CONTEXT(1082, "44A13", "Cannot set 
transaction context if transactions are disabled"),
+    TX_MUST_BE_ENABLED_TO_SET_AUTO_FLUSH(1083, "44A14", "Cannot set auto flush 
if transactions are disabled"),
 
     /** Sequence related */
     SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new 
Factory() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd4f4547/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index fa5e962..8ae9481 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -124,6 +124,7 @@ public class MutationState implements SQLCloseable {
     private long sizeOffset;
     private int numRows = 0;
     private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
+    private boolean isExternalTxContext = false;
     
     private final MutationMetricQueue mutationMetricQueue;
     private ReadMetricQueue readMetricQueue;
@@ -170,6 +171,7 @@ public class MutationState implements SQLCloseable {
                                        
.getQueryServices().getTransactionSystemClient();
                        this.txContext = new 
TransactionContext(txServiceClient);
                    } else {
+                       isExternalTxContext = true;
                        this.txContext = txContext;
                    }
                } else {
@@ -224,7 +226,12 @@ public class MutationState implements SQLCloseable {
             boolean hasUncommittedData = false;
             for (TableRef source : sources) {
                 String sourcePhysicalName = 
source.getTable().getPhysicalName().getString();
-                if (source.getTable().isTransactional() && 
uncommittedPhysicalNames.contains(sourcePhysicalName)) {
+                // Tracking uncommitted physical table names is an 
optimization that prevents us from
+                // having to do a checkpoint if no data has yet been written. 
If we're using an
+                // external transaction context, it's possible that data was 
already written at the
+                // current transaction timestamp, so we always checkpoint in 
that case is we're
+                // reading and writing to the same table.
+                if (source.getTable().isTransactional() && 
(isExternalTxContext || uncommittedPhysicalNames.contains(sourcePhysicalName))) 
{
                     hasUncommittedData = true;
                     break;
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd4f4547/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index 9ef1643..d33f20c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -107,6 +107,8 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMap.Builder;
 import com.google.common.collect.Lists;
 
+import co.cask.tephra.TransactionContext;
+
 
 /**
  * 
@@ -125,11 +127,12 @@ public class PhoenixConnection implements Connection, 
org.apache.phoenix.jdbc.Jd
     private final String url;
     private final ConnectionQueryServices services;
     private final Properties info;
-    private List<SQLCloseable> statements = new ArrayList<SQLCloseable>();
     private final Map<PDataType<?>, Format> formatters = new HashMap<>();
-    private final MutationState mutationState;
     private final int mutateBatchSize;
     private final Long scn;
+    private MutationState mutationState;
+    private List<SQLCloseable> statements = new ArrayList<SQLCloseable>();
+    private boolean isAutoFlush = false;
     private boolean isAutoCommit = false;
     private PMetaData metaData;
     private final PName tenantId;
@@ -160,6 +163,7 @@ public class PhoenixConnection implements Connection, 
org.apache.phoenix.jdbc.Jd
     public PhoenixConnection(PhoenixConnection connection, boolean 
isDescRowKeyOrderUpgrade) throws SQLException {
         this(connection.getQueryServices(), connection.getURL(), 
connection.getClientInfo(), connection.metaData, connection.getMutationState(), 
isDescRowKeyOrderUpgrade);
         this.isAutoCommit = connection.isAutoCommit;
+        this.isAutoFlush = connection.isAutoFlush;
         this.sampler = connection.sampler;
         this.statementExecutionCounter = connection.statementExecutionCounter;
     }
@@ -179,6 +183,7 @@ public class PhoenixConnection implements Connection, 
org.apache.phoenix.jdbc.Jd
     public PhoenixConnection(ConnectionQueryServices services, 
PhoenixConnection connection, long scn) throws SQLException {
         this(services, connection.getURL(), 
newPropsWithSCN(scn,connection.getClientInfo()), connection.metaData, 
connection.getMutationState(), connection.isDescVarLengthRowKeyUpgrade());
         this.isAutoCommit = connection.isAutoCommit;
+        this.isAutoFlush = connection.isAutoFlush;
         this.sampler = connection.sampler;
         this.statementExecutionCounter = connection.statementExecutionCounter;
     }
@@ -218,6 +223,8 @@ public class PhoenixConnection implements Connection, 
org.apache.phoenix.jdbc.Jd
         Long scnParam = JDBCUtil.getCurrentSCN(url, this.info);
         checkScn(scnParam);
         this.scn = scnParam;
+        this.isAutoFlush = 
this.services.getProps().getBoolean(QueryServices.TRANSACTIONS_ENABLED, 
QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)
+                && 
this.services.getProps().getBoolean(QueryServices.AUTO_FLUSH_ATTRIB, 
QueryServicesOptions.DEFAULT_AUTO_FLUSH) ;
         this.isAutoCommit = JDBCUtil.getAutoCommit(
                 url, this.info,
                 this.services.getProps().getBoolean(
@@ -281,17 +288,13 @@ public class PhoenixConnection implements Connection, 
org.apache.phoenix.jdbc.Jd
 
     private static Properties filterKnownNonProperties(Properties info) {
         Properties prunedProperties = info;
-        if (info.contains(PhoenixRuntime.CURRENT_SCN_ATTRIB)) {
-            if (prunedProperties == info) {
-                prunedProperties = PropertiesUtil.deepCopy(info);
-            }
-            prunedProperties.remove(PhoenixRuntime.CURRENT_SCN_ATTRIB);
-        }
-        if (info.contains(PhoenixRuntime.TENANT_ID_ATTRIB)) {
-            if (prunedProperties == info) {
-                prunedProperties = PropertiesUtil.deepCopy(info);
+        for (String property : PhoenixRuntime.CONNECTION_PROPERTIES) {
+            if (info.contains(property)) {
+                if (prunedProperties == info) {
+                    prunedProperties = PropertiesUtil.deepCopy(info);
+                }
+                prunedProperties.remove(property);
             }
-            prunedProperties.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
         }
         return prunedProperties;
     }
@@ -578,11 +581,40 @@ public class PhoenixConnection implements Connection, 
org.apache.phoenix.jdbc.Jd
         return isAutoCommit;
     }
 
+    public boolean getAutoFlush() {
+        return isAutoFlush;
+    }
+
+    public void setAutoFlush(boolean autoFlush) throws SQLException {
+        if (autoFlush && 
!this.services.getProps().getBoolean(QueryServices.TRANSACTIONS_ENABLED, 
QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)) {
+            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.TX_MUST_BE_ENABLED_TO_SET_AUTO_FLUSH)
+            .build().buildException();
+        }
+        this.isAutoFlush = autoFlush;
+    }
+
+    public void flush() throws SQLException {
+        mutationState.sendUncommitted();
+    }
+        
+    public void setTransactionContext(TransactionContext txContext) throws 
SQLException {
+        if 
(!this.services.getProps().getBoolean(QueryServices.TRANSACTIONS_ENABLED, 
QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)) {
+            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.TX_MUST_BE_ENABLED_TO_SET_TX_CONTEXT)
+            .build().buildException();
+        }
+        this.mutationState.rollback();
+        this.mutationState = new 
MutationState(this.mutationState.getMaxSize(), this, txContext);
+        
+        // Write data to HBase after each statement execution as the commit 
may not
+        // come through Phoenix APIs.
+        setAutoFlush(true);
+    }
+    
     public Consistency getConsistency() {
         return this.consistency;
     }
 
-    @Override
+   @Override
     public String getCatalog() throws SQLException {
         return "";
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd4f4547/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index bacea92..fe62ea4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -1217,6 +1217,8 @@ public class PhoenixStatement implements Statement, 
SQLCloseable, org.apache.pho
                 PhoenixPreparedStatement statement = batch.get(i);
                 returnCodes[i] = statement.execute(true) ? 
Statement.SUCCESS_NO_INFO : statement.getUpdateCount();
             }
+            // Flush all changes in batch if auto flush is true
+            flushIfNecessary();
             // If we make it all the way through, clear the batch
             clearBatch();
             return returnCodes;
@@ -1321,9 +1323,17 @@ public class PhoenixStatement implements Statement, 
SQLCloseable, org.apache.pho
             throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH)
             .build().buildException();
         }
-        return executeMutation(stmt);
+        int updateCount = executeMutation(stmt);
+        flushIfNecessary();
+        return updateCount;
     }
 
+    private void flushIfNecessary() throws SQLException {
+        if (connection.getAutoFlush()) {
+            connection.flush();
+        }
+    }
+    
     @Override
     public boolean execute(String sql) throws SQLException {
         CompilableStatement stmt = parseStatement(sql);
@@ -1333,6 +1343,7 @@ public class PhoenixStatement implements Statement, 
SQLCloseable, org.apache.pho
                 .build().buildException();
             }
             executeMutation(stmt);
+            flushIfNecessary();
             return false;
         }
         executeQuery(stmt);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd4f4547/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 908c479..8f5f9a1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -86,7 +86,7 @@ public interface QueryServices extends SQLCloseable {
     public static final String MUTATE_BATCH_SIZE_ATTRIB = 
"phoenix.mutate.batchSize";
     public static final String MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB = 
"phoenix.coprocessor.maxServerCacheTimeToLiveMs";
     
-    // Deprecated. Use FORCE_ROW_KEY_ORDER instead.
+    @Deprecated // Use FORCE_ROW_KEY_ORDER instead.
     public static final String ROW_KEY_ORDER_SALTED_TABLE_ATTRIB  = 
"phoenix.query.rowKeyOrderSaltedTable";
     
     public static final String USE_INDEXES_ATTRIB  = 
"phoenix.query.useIndexes";
@@ -164,9 +164,14 @@ public interface QueryServices extends SQLCloseable {
     public static final String DEFAULT_KEEP_DELETED_CELLS_ATTRIB = 
"phoenix.table.default.keep.deleted.cells";
     public static final String DEFAULT_STORE_NULLS_ATTRIB = 
"phoenix.table.default.store.nulls";
     public static final String DEFAULT_TABLE_ISTRANSACTIONAL_ATTRIB = 
"phoenix.table.istransactional.default";
-    public static final String TRANSACTIONS_ENABLED = 
"phoenix.transactions.enabled";
     public static final String GLOBAL_METRICS_ENABLED = 
"phoenix.query.global.metrics.enabled";
     
+    // Transaction related configs
+    public static final String TRANSACTIONS_ENABLED = 
"phoenix.transactions.enabled";
+    // Controls whether or not uncommitted data is automatically sent to HBase
+    // at the end of a statement execution when transaction state is passed 
through.
+    public static final String AUTO_FLUSH_ATTRIB = 
"phoenix.transactions.autoFlush";
+
     // rpc queue configs
     public static final String INDEX_HANDLER_COUNT_ATTRIB = 
"phoenix.rpc.index.handler.count";
     public static final String METADATA_HANDLER_COUNT_ATTRIB = 
"phoenix.rpc.metadata.handler.count";

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd4f4547/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 730a5ef..b256ff3 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -71,8 +71,8 @@ import static 
org.apache.phoenix.query.QueryServices.USE_BYTE_BASED_REGEX_ATTRIB
 import static org.apache.phoenix.query.QueryServices.USE_INDEXES_ATTRIB;
 
 import java.util.HashSet;
-import java.util.Set;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Coprocessor;
@@ -199,11 +199,13 @@ public class QueryServicesOptions {
     // TODO Change this to true as part of PHOENIX-1543
     // We'll also need this for transactions to work correctly
     public static final boolean DEFAULT_AUTO_COMMIT = false;
-    public static final boolean DEFAULT_TRANSACTIONAL = false;
     public static final boolean DEFAULT_TABLE_ISTRANSACTIONAL = false;
     public static final boolean DEFAULT_TRANSACTIONS_ENABLED = false;
     public static final boolean DEFAULT_IS_GLOBAL_METRICS_ENABLED = true;
     
+    public static final boolean DEFAULT_TRANSACTIONAL = false;
+    public static final boolean DEFAULT_AUTO_FLUSH = false;
+
     private static final String DEFAULT_CLIENT_RPC_CONTROLLER_FACTORY = 
ClientRpcControllerFactory.class.getName();
     
     public static final String DEFAULT_CONSISTENCY_LEVEL = 
Consistency.STRONG.toString();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd4f4547/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index d4a45f6..a5fd96b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -98,14 +98,6 @@ import com.google.common.collect.Lists;
  */
 public class PhoenixRuntime {
     /**
-     * Use this connection property to control HBase timestamps
-     * by specifying your own long timestamp value at connection time. All
-     * queries will use this as the upper bound of the time range for scans
-     * and DDL, and DML will use this as t he timestamp for key values.
-     */
-    public static final String CURRENT_SCN_ATTRIB = "CurrentSCN";
-
-    /**
      * Root for the JDBC URL that the Phoenix accepts accepts.
      */
     public final static String JDBC_PROTOCOL = "jdbc:phoenix";
@@ -121,13 +113,12 @@ public class PhoenixRuntime {
     public final static String EMBEDDED_JDBC_PROTOCOL = 
PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 
     /**
-     * Use this connection property to control the number of rows that are
-     * batched together on an UPSERT INTO table1... SELECT ... FROM table2.
-     * It's only used when autoCommit is true and your source table is
-     * different than your target table or your SELECT statement has a
-     * GROUP BY clause.
+     * Use this connection property to control HBase timestamps
+     * by specifying your own long timestamp value at connection time. All
+     * queries will use this as the upper bound of the time range for scans
+     * and DDL, and DML will use this as t he timestamp for key values.
      */
-    public final static String UPSERT_BATCH_SIZE_ATTRIB = "UpsertBatchSize";
+    public static final String CURRENT_SCN_ATTRIB = "CurrentSCN";
 
     /**
      * Use this connection property to help with fairness of resource 
allocation
@@ -139,11 +130,13 @@ public class PhoenixRuntime {
     public static final String TENANT_ID_ATTRIB = "TenantId";
 
     /**
-     * Use this connection property prefix for annotations that you want to 
show up in traces and log lines emitted by Phoenix.
-     * This is useful for annotating connections with information available on 
the client (e.g. user or session identifier) and
-     * having these annotation automatically passed into log lines and traces 
by Phoenix.
+     * Use this connection property to control the number of rows that are
+     * batched together on an UPSERT INTO table1... SELECT ... FROM table2.
+     * It's only used when autoCommit is true and your source table is
+     * different than your target table or your SELECT statement has a
+     * GROUP BY clause.
      */
-    public static final String ANNOTATION_ATTRIB_PREFIX = 
"phoenix.annotation.";
+    public final static String UPSERT_BATCH_SIZE_ATTRIB = "UpsertBatchSize";
 
     /**
      * Use this connection property to explicitly enable or disable 
auto-commit on a new connection.
@@ -151,11 +144,38 @@ public class PhoenixRuntime {
     public static final String AUTO_COMMIT_ATTRIB = "AutoCommit";
 
     /**
+<<<<<<< HEAD
+=======
+<<<<<<< HEAD
+=======
+>>>>>>> 4e96747... PHOENIX-2411 Allow Phoenix to participate as transactional 
component
      * Use this connection property to explicitly set read consistency level 
on a new connection.
      */
     public static final String CONSISTENCY_ATTRIB = "Consistency";
 
     /**
+<<<<<<< HEAD
+=======
+     * Use this connection property to explicitly enable or disable request 
level metric collection.
+     */
+    public static final String REQUEST_METRIC_ATTRIB = "RequestMetric";
+
+    /**
+     * All Phoenix specific connection properties
+     * TODO: use enum instead
+     */
+    public final static String[] CONNECTION_PROPERTIES = {
+            CURRENT_SCN_ATTRIB,
+            TENANT_ID_ATTRIB,
+            UPSERT_BATCH_SIZE_ATTRIB,
+            AUTO_COMMIT_ATTRIB,
+            CONSISTENCY_ATTRIB,
+            REQUEST_METRIC_ATTRIB
+            };
+
+    /**
+>>>>>>> 5180ae0... PHOENIX-2411 Allow Phoenix to participate as transactional 
component
+>>>>>>> 4e96747... PHOENIX-2411 Allow Phoenix to participate as transactional 
component
      * Use this as the zookeeper quorum name to have a connection-less 
connection. This enables
      * Phoenix-compatible HFiles to be created in a map/reduce job by creating 
tables,
      * upserting data into them, and getting the uncommitted state through 
{@link #getUncommittedData(Connection)}
@@ -163,9 +183,11 @@ public class PhoenixRuntime {
     public final static String CONNECTIONLESS = "none";
     
     /**
-     * Use this connection property to explicitly enable or disable request 
level metric collection.
+     * Use this connection property prefix for annotations that you want to 
show up in traces and log lines emitted by Phoenix.
+     * This is useful for annotating connections with information available on 
the client (e.g. user or session identifier) and
+     * having these annotation automatically passed into log lines and traces 
by Phoenix.
      */
-    public static final String REQUEST_METRIC_ATTRIB = "RequestMetric";
+    public static final String ANNOTATION_ATTRIB_PREFIX = 
"phoenix.annotation.";
 
     private static final String HEADER_IN_LINE = "in-line";
     private static final String SQL_FILE_EXT = ".sql";

Reply via email to