PHOENIX-2411 Allow Phoenix to participate as transactional component
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/cd4f4547 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/cd4f4547 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/cd4f4547 Branch: refs/heads/4.x-HBase-1.0 Commit: cd4f4547b77d3da9816fc9cbfad59298a04211ca Parents: 16e5c7b Author: James Taylor <[email protected]> Authored: Sat Dec 26 23:56:59 2015 -0800 Committer: James Taylor <[email protected]> Committed: Wed Dec 30 18:14:20 2015 -0800 ---------------------------------------------------------------------- .../org/apache/phoenix/tx/TransactionIT.java | 136 ++++++++++++++++++- .../phoenix/exception/SQLExceptionCode.java | 2 + .../apache/phoenix/execute/MutationState.java | 9 +- .../apache/phoenix/jdbc/PhoenixConnection.java | 58 ++++++-- .../apache/phoenix/jdbc/PhoenixStatement.java | 13 +- .../org/apache/phoenix/query/QueryServices.java | 9 +- .../phoenix/query/QueryServicesOptions.java | 6 +- .../org/apache/phoenix/util/PhoenixRuntime.java | 62 ++++++--- 8 files changed, 249 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd4f4547/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java index 929e4a5..e83467a 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java @@ -20,8 +20,6 @@ package org.apache.phoenix.tx; import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.apache.phoenix.util.TestUtil.TRANSACTIONAL_DATA_TABLE; -import static org.apache.phoenix.util.TestUtil.analyzeTable; -import static org.apache.phoenix.util.TestUtil.getAllSplits; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -41,22 +39,22 @@ import java.util.Properties; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; import org.apache.phoenix.end2end.Shadower; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; -import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableKey; -import org.apache.phoenix.schema.ReadOnlyTableException; import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.PropertiesUtil; @@ -66,12 +64,15 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import co.cask.tephra.TxConstants; -import co.cask.tephra.hbase10.coprocessor.TransactionProcessor; - import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import co.cask.tephra.TransactionContext; +import co.cask.tephra.TransactionSystemClient; +import co.cask.tephra.TxConstants; +import co.cask.tephra.hbase10.TransactionAwareHTable; +import co.cask.tephra.hbase10.coprocessor.TransactionProcessor; + public class TransactionIT extends BaseHBaseManagedTimeIT { private static final String FULL_TABLE_NAME = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + TRANSACTIONAL_DATA_TABLE; @@ -603,4 +604,125 @@ public class TransactionIT extends BaseHBaseManagedTimeIT { } assertEquals(5, count); } + + @Test + public void testExternalTxContext() throws Exception { + ResultSet rs; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(false); + PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); + + TransactionSystemClient txServiceClient = pconn.getQueryServices().getTransactionSystemClient(); + + String fullTableName = "T"; + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE " + fullTableName + "(K VARCHAR PRIMARY KEY, V1 VARCHAR, V2 VARCHAR) TRANSACTIONAL=true"); + HTableInterface htable = pconn.getQueryServices().getTable(Bytes.toBytes(fullTableName)); + stmt.executeUpdate("upsert into " + fullTableName + " values('x', 'a', 'a')"); + conn.commit(); + + try (Connection newConn = DriverManager.getConnection(getUrl(), props)) { + rs = newConn.createStatement().executeQuery("select count(*) from " + fullTableName); + assertTrue(rs.next()); + assertEquals(1,rs.getInt(1)); + } + + // Use HBase level Tephra APIs to start a new transaction + TransactionAwareHTable txAware = new TransactionAwareHTable(htable, TxConstants.ConflictDetection.ROW); + TransactionContext txContext = new TransactionContext(txServiceClient, txAware); + txContext.start(); + + // Use HBase APIs to add a new row + Put put = new Put(Bytes.toBytes("z")); + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES); + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, Bytes.toBytes("V1"), Bytes.toBytes("b")); + txAware.put(put); + + // Use Phoenix APIs to add new row (sharing the transaction context) + pconn.setTransactionContext(txContext); + conn.createStatement().executeUpdate("upsert into " + fullTableName + " values('y', 'c', 'c')"); + + // New connection should not see data as it hasn't been committed yet + try (Connection newConn = DriverManager.getConnection(getUrl(), props)) { + rs = newConn.createStatement().executeQuery("select count(*) from " + fullTableName); + assertTrue(rs.next()); + assertEquals(1,rs.getInt(1)); + } + + // Use new connection to create a row with a conflict + Connection connWithConflict = DriverManager.getConnection(getUrl(), props); + connWithConflict.createStatement().execute("upsert into " + fullTableName + " values('z', 'd', 'd')"); + + // Existing connection should see data even though it hasn't been committed yet + rs = conn.createStatement().executeQuery("select count(*) from " + fullTableName); + assertTrue(rs.next()); + assertEquals(3,rs.getInt(1)); + + // Use Tephra APIs directly to finish (i.e. commit) the transaction + txContext.finish(); + + // Confirm that attempt to commit row with conflict fails + try { + connWithConflict.commit(); + fail(); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode(), e.getErrorCode()); + } finally { + connWithConflict.close(); + } + + // New connection should now see data as it has been committed + try (Connection newConn = DriverManager.getConnection(getUrl(), props)) { + rs = newConn.createStatement().executeQuery("select count(*) from " + fullTableName); + assertTrue(rs.next()); + assertEquals(3,rs.getInt(1)); + } + + // Repeat the same as above, but this time abort the transaction + txContext = new TransactionContext(txServiceClient, txAware); + txContext.start(); + + // Use HBase APIs to add a new row + put = new Put(Bytes.toBytes("j")); + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES); + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, Bytes.toBytes("V1"), Bytes.toBytes("e")); + txAware.put(put); + + // Use Phoenix APIs to add new row (sharing the transaction context) + pconn.setTransactionContext(txContext); + conn.createStatement().executeUpdate("upsert into " + fullTableName + " values('k', 'f', 'f')"); + + // Existing connection should see data even though it hasn't been committed yet + rs = conn.createStatement().executeQuery("select count(*) from " + fullTableName); + assertTrue(rs.next()); + assertEquals(5,rs.getInt(1)); + + connWithConflict.createStatement().execute("upsert into " + fullTableName + " values('k', 'g', 'g')"); + rs = connWithConflict.createStatement().executeQuery("select count(*) from " + fullTableName); + assertTrue(rs.next()); + assertEquals(4,rs.getInt(1)); + + // Use Tephra APIs directly to abort (i.e. rollback) the transaction + txContext.abort(); + + rs = conn.createStatement().executeQuery("select count(*) from " + fullTableName); + assertTrue(rs.next()); + assertEquals(3,rs.getInt(1)); + + // Should succeed since conflicting row was aborted + connWithConflict.commit(); + + // New connection should now see data as it has been committed + try (Connection newConn = DriverManager.getConnection(getUrl(), props)) { + rs = newConn.createStatement().executeQuery("select count(*) from " + fullTableName); + assertTrue(rs.next()); + assertEquals(4,rs.getInt(1)); + } + + // Even using HBase APIs directly, we shouldn't find 'j' since a delete marker would have been + // written to hide it. + Result result = htable.get(new Get(Bytes.toBytes("j"))); + assertTrue(result.isEmpty()); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd4f4547/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index ef135eb..36036ae 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -280,6 +280,8 @@ public enum SQLExceptionCode { CANNOT_ALTER_TO_BE_TXN_IF_TXNS_DISABLED(1079, "44A10", "Cannot alter table to be transactional table if transactions are disabled"), CANNOT_CREATE_TXN_TABLE_WITH_ROW_TIMESTAMP(1080, "44A11", "Cannot create a transactional table if transactions are disabled"), CANNOT_ALTER_TO_BE_TXN_WITH_ROW_TIMESTAMP(1081, "44A12", "Cannot alter table to be transactional table if transactions are disabled"), + TX_MUST_BE_ENABLED_TO_SET_TX_CONTEXT(1082, "44A13", "Cannot set transaction context if transactions are disabled"), + TX_MUST_BE_ENABLED_TO_SET_AUTO_FLUSH(1083, "44A14", "Cannot set auto flush if transactions are disabled"), /** Sequence related */ SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new Factory() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd4f4547/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index fa5e962..8ae9481 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -124,6 +124,7 @@ public class MutationState implements SQLCloseable { private long sizeOffset; private int numRows = 0; private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY; + private boolean isExternalTxContext = false; private final MutationMetricQueue mutationMetricQueue; private ReadMetricQueue readMetricQueue; @@ -170,6 +171,7 @@ public class MutationState implements SQLCloseable { .getQueryServices().getTransactionSystemClient(); this.txContext = new TransactionContext(txServiceClient); } else { + isExternalTxContext = true; this.txContext = txContext; } } else { @@ -224,7 +226,12 @@ public class MutationState implements SQLCloseable { boolean hasUncommittedData = false; for (TableRef source : sources) { String sourcePhysicalName = source.getTable().getPhysicalName().getString(); - if (source.getTable().isTransactional() && uncommittedPhysicalNames.contains(sourcePhysicalName)) { + // Tracking uncommitted physical table names is an optimization that prevents us from + // having to do a checkpoint if no data has yet been written. If we're using an + // external transaction context, it's possible that data was already written at the + // current transaction timestamp, so we always checkpoint in that case is we're + // reading and writing to the same table. + if (source.getTable().isTransactional() && (isExternalTxContext || uncommittedPhysicalNames.contains(sourcePhysicalName))) { hasUncommittedData = true; break; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd4f4547/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index 9ef1643..d33f20c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -107,6 +107,8 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap.Builder; import com.google.common.collect.Lists; +import co.cask.tephra.TransactionContext; + /** * @@ -125,11 +127,12 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd private final String url; private final ConnectionQueryServices services; private final Properties info; - private List<SQLCloseable> statements = new ArrayList<SQLCloseable>(); private final Map<PDataType<?>, Format> formatters = new HashMap<>(); - private final MutationState mutationState; private final int mutateBatchSize; private final Long scn; + private MutationState mutationState; + private List<SQLCloseable> statements = new ArrayList<SQLCloseable>(); + private boolean isAutoFlush = false; private boolean isAutoCommit = false; private PMetaData metaData; private final PName tenantId; @@ -160,6 +163,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd public PhoenixConnection(PhoenixConnection connection, boolean isDescRowKeyOrderUpgrade) throws SQLException { this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.metaData, connection.getMutationState(), isDescRowKeyOrderUpgrade); this.isAutoCommit = connection.isAutoCommit; + this.isAutoFlush = connection.isAutoFlush; this.sampler = connection.sampler; this.statementExecutionCounter = connection.statementExecutionCounter; } @@ -179,6 +183,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd public PhoenixConnection(ConnectionQueryServices services, PhoenixConnection connection, long scn) throws SQLException { this(services, connection.getURL(), newPropsWithSCN(scn,connection.getClientInfo()), connection.metaData, connection.getMutationState(), connection.isDescVarLengthRowKeyUpgrade()); this.isAutoCommit = connection.isAutoCommit; + this.isAutoFlush = connection.isAutoFlush; this.sampler = connection.sampler; this.statementExecutionCounter = connection.statementExecutionCounter; } @@ -218,6 +223,8 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd Long scnParam = JDBCUtil.getCurrentSCN(url, this.info); checkScn(scnParam); this.scn = scnParam; + this.isAutoFlush = this.services.getProps().getBoolean(QueryServices.TRANSACTIONS_ENABLED, QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED) + && this.services.getProps().getBoolean(QueryServices.AUTO_FLUSH_ATTRIB, QueryServicesOptions.DEFAULT_AUTO_FLUSH) ; this.isAutoCommit = JDBCUtil.getAutoCommit( url, this.info, this.services.getProps().getBoolean( @@ -281,17 +288,13 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd private static Properties filterKnownNonProperties(Properties info) { Properties prunedProperties = info; - if (info.contains(PhoenixRuntime.CURRENT_SCN_ATTRIB)) { - if (prunedProperties == info) { - prunedProperties = PropertiesUtil.deepCopy(info); - } - prunedProperties.remove(PhoenixRuntime.CURRENT_SCN_ATTRIB); - } - if (info.contains(PhoenixRuntime.TENANT_ID_ATTRIB)) { - if (prunedProperties == info) { - prunedProperties = PropertiesUtil.deepCopy(info); + for (String property : PhoenixRuntime.CONNECTION_PROPERTIES) { + if (info.contains(property)) { + if (prunedProperties == info) { + prunedProperties = PropertiesUtil.deepCopy(info); + } + prunedProperties.remove(property); } - prunedProperties.remove(PhoenixRuntime.TENANT_ID_ATTRIB); } return prunedProperties; } @@ -578,11 +581,40 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd return isAutoCommit; } + public boolean getAutoFlush() { + return isAutoFlush; + } + + public void setAutoFlush(boolean autoFlush) throws SQLException { + if (autoFlush && !this.services.getProps().getBoolean(QueryServices.TRANSACTIONS_ENABLED, QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_MUST_BE_ENABLED_TO_SET_AUTO_FLUSH) + .build().buildException(); + } + this.isAutoFlush = autoFlush; + } + + public void flush() throws SQLException { + mutationState.sendUncommitted(); + } + + public void setTransactionContext(TransactionContext txContext) throws SQLException { + if (!this.services.getProps().getBoolean(QueryServices.TRANSACTIONS_ENABLED, QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_MUST_BE_ENABLED_TO_SET_TX_CONTEXT) + .build().buildException(); + } + this.mutationState.rollback(); + this.mutationState = new MutationState(this.mutationState.getMaxSize(), this, txContext); + + // Write data to HBase after each statement execution as the commit may not + // come through Phoenix APIs. + setAutoFlush(true); + } + public Consistency getConsistency() { return this.consistency; } - @Override + @Override public String getCatalog() throws SQLException { return ""; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd4f4547/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index bacea92..fe62ea4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -1217,6 +1217,8 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho PhoenixPreparedStatement statement = batch.get(i); returnCodes[i] = statement.execute(true) ? Statement.SUCCESS_NO_INFO : statement.getUpdateCount(); } + // Flush all changes in batch if auto flush is true + flushIfNecessary(); // If we make it all the way through, clear the batch clearBatch(); return returnCodes; @@ -1321,9 +1323,17 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho throw new SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH) .build().buildException(); } - return executeMutation(stmt); + int updateCount = executeMutation(stmt); + flushIfNecessary(); + return updateCount; } + private void flushIfNecessary() throws SQLException { + if (connection.getAutoFlush()) { + connection.flush(); + } + } + @Override public boolean execute(String sql) throws SQLException { CompilableStatement stmt = parseStatement(sql); @@ -1333,6 +1343,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho .build().buildException(); } executeMutation(stmt); + flushIfNecessary(); return false; } executeQuery(stmt); http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd4f4547/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 908c479..8f5f9a1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -86,7 +86,7 @@ public interface QueryServices extends SQLCloseable { public static final String MUTATE_BATCH_SIZE_ATTRIB = "phoenix.mutate.batchSize"; public static final String MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB = "phoenix.coprocessor.maxServerCacheTimeToLiveMs"; - // Deprecated. Use FORCE_ROW_KEY_ORDER instead. + @Deprecated // Use FORCE_ROW_KEY_ORDER instead. public static final String ROW_KEY_ORDER_SALTED_TABLE_ATTRIB = "phoenix.query.rowKeyOrderSaltedTable"; public static final String USE_INDEXES_ATTRIB = "phoenix.query.useIndexes"; @@ -164,9 +164,14 @@ public interface QueryServices extends SQLCloseable { public static final String DEFAULT_KEEP_DELETED_CELLS_ATTRIB = "phoenix.table.default.keep.deleted.cells"; public static final String DEFAULT_STORE_NULLS_ATTRIB = "phoenix.table.default.store.nulls"; public static final String DEFAULT_TABLE_ISTRANSACTIONAL_ATTRIB = "phoenix.table.istransactional.default"; - public static final String TRANSACTIONS_ENABLED = "phoenix.transactions.enabled"; public static final String GLOBAL_METRICS_ENABLED = "phoenix.query.global.metrics.enabled"; + // Transaction related configs + public static final String TRANSACTIONS_ENABLED = "phoenix.transactions.enabled"; + // Controls whether or not uncommitted data is automatically sent to HBase + // at the end of a statement execution when transaction state is passed through. + public static final String AUTO_FLUSH_ATTRIB = "phoenix.transactions.autoFlush"; + // rpc queue configs public static final String INDEX_HANDLER_COUNT_ATTRIB = "phoenix.rpc.index.handler.count"; public static final String METADATA_HANDLER_COUNT_ATTRIB = "phoenix.rpc.metadata.handler.count"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd4f4547/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 730a5ef..b256ff3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -71,8 +71,8 @@ import static org.apache.phoenix.query.QueryServices.USE_BYTE_BASED_REGEX_ATTRIB import static org.apache.phoenix.query.QueryServices.USE_INDEXES_ATTRIB; import java.util.HashSet; -import java.util.Set; import java.util.Map.Entry; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Coprocessor; @@ -199,11 +199,13 @@ public class QueryServicesOptions { // TODO Change this to true as part of PHOENIX-1543 // We'll also need this for transactions to work correctly public static final boolean DEFAULT_AUTO_COMMIT = false; - public static final boolean DEFAULT_TRANSACTIONAL = false; public static final boolean DEFAULT_TABLE_ISTRANSACTIONAL = false; public static final boolean DEFAULT_TRANSACTIONS_ENABLED = false; public static final boolean DEFAULT_IS_GLOBAL_METRICS_ENABLED = true; + public static final boolean DEFAULT_TRANSACTIONAL = false; + public static final boolean DEFAULT_AUTO_FLUSH = false; + private static final String DEFAULT_CLIENT_RPC_CONTROLLER_FACTORY = ClientRpcControllerFactory.class.getName(); public static final String DEFAULT_CONSISTENCY_LEVEL = Consistency.STRONG.toString(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd4f4547/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java index d4a45f6..a5fd96b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java @@ -98,14 +98,6 @@ import com.google.common.collect.Lists; */ public class PhoenixRuntime { /** - * Use this connection property to control HBase timestamps - * by specifying your own long timestamp value at connection time. All - * queries will use this as the upper bound of the time range for scans - * and DDL, and DML will use this as t he timestamp for key values. - */ - public static final String CURRENT_SCN_ATTRIB = "CurrentSCN"; - - /** * Root for the JDBC URL that the Phoenix accepts accepts. */ public final static String JDBC_PROTOCOL = "jdbc:phoenix"; @@ -121,13 +113,12 @@ public class PhoenixRuntime { public final static String EMBEDDED_JDBC_PROTOCOL = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; /** - * Use this connection property to control the number of rows that are - * batched together on an UPSERT INTO table1... SELECT ... FROM table2. - * It's only used when autoCommit is true and your source table is - * different than your target table or your SELECT statement has a - * GROUP BY clause. + * Use this connection property to control HBase timestamps + * by specifying your own long timestamp value at connection time. All + * queries will use this as the upper bound of the time range for scans + * and DDL, and DML will use this as t he timestamp for key values. */ - public final static String UPSERT_BATCH_SIZE_ATTRIB = "UpsertBatchSize"; + public static final String CURRENT_SCN_ATTRIB = "CurrentSCN"; /** * Use this connection property to help with fairness of resource allocation @@ -139,11 +130,13 @@ public class PhoenixRuntime { public static final String TENANT_ID_ATTRIB = "TenantId"; /** - * Use this connection property prefix for annotations that you want to show up in traces and log lines emitted by Phoenix. - * This is useful for annotating connections with information available on the client (e.g. user or session identifier) and - * having these annotation automatically passed into log lines and traces by Phoenix. + * Use this connection property to control the number of rows that are + * batched together on an UPSERT INTO table1... SELECT ... FROM table2. + * It's only used when autoCommit is true and your source table is + * different than your target table or your SELECT statement has a + * GROUP BY clause. */ - public static final String ANNOTATION_ATTRIB_PREFIX = "phoenix.annotation."; + public final static String UPSERT_BATCH_SIZE_ATTRIB = "UpsertBatchSize"; /** * Use this connection property to explicitly enable or disable auto-commit on a new connection. @@ -151,11 +144,38 @@ public class PhoenixRuntime { public static final String AUTO_COMMIT_ATTRIB = "AutoCommit"; /** +<<<<<<< HEAD +======= +<<<<<<< HEAD +======= +>>>>>>> 4e96747... PHOENIX-2411 Allow Phoenix to participate as transactional component * Use this connection property to explicitly set read consistency level on a new connection. */ public static final String CONSISTENCY_ATTRIB = "Consistency"; /** +<<<<<<< HEAD +======= + * Use this connection property to explicitly enable or disable request level metric collection. + */ + public static final String REQUEST_METRIC_ATTRIB = "RequestMetric"; + + /** + * All Phoenix specific connection properties + * TODO: use enum instead + */ + public final static String[] CONNECTION_PROPERTIES = { + CURRENT_SCN_ATTRIB, + TENANT_ID_ATTRIB, + UPSERT_BATCH_SIZE_ATTRIB, + AUTO_COMMIT_ATTRIB, + CONSISTENCY_ATTRIB, + REQUEST_METRIC_ATTRIB + }; + + /** +>>>>>>> 5180ae0... PHOENIX-2411 Allow Phoenix to participate as transactional component +>>>>>>> 4e96747... PHOENIX-2411 Allow Phoenix to participate as transactional component * Use this as the zookeeper quorum name to have a connection-less connection. This enables * Phoenix-compatible HFiles to be created in a map/reduce job by creating tables, * upserting data into them, and getting the uncommitted state through {@link #getUncommittedData(Connection)} @@ -163,9 +183,11 @@ public class PhoenixRuntime { public final static String CONNECTIONLESS = "none"; /** - * Use this connection property to explicitly enable or disable request level metric collection. + * Use this connection property prefix for annotations that you want to show up in traces and log lines emitted by Phoenix. + * This is useful for annotating connections with information available on the client (e.g. user or session identifier) and + * having these annotation automatically passed into log lines and traces by Phoenix. */ - public static final String REQUEST_METRIC_ATTRIB = "RequestMetric"; + public static final String ANNOTATION_ATTRIB_PREFIX = "phoenix.annotation."; private static final String HEADER_IN_LINE = "in-line"; private static final String SQL_FILE_EXT = ".sql";
