PHOENIX-5026 Add client setting to disable server side mutations.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f6b75942 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f6b75942 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f6b75942 Branch: refs/heads/4.x-cdh5.15 Commit: f6b75942701dbf90d7dc3d69be6265130e69ff94 Parents: bb17957 Author: Lars Hofhansl <la...@apache.org> Authored: Thu Nov 22 03:53:14 2018 +0000 Committer: Pedro Boado <pbo...@apache.org> Committed: Tue Nov 27 15:12:18 2018 +0000 ---------------------------------------------------------------------- .../org/apache/phoenix/end2end/DeleteIT.java | 62 ++++++++--- .../end2end/UpsertSelectAutoCommitIT.java | 26 +++-- .../apache/phoenix/end2end/UpsertSelectIT.java | 103 +++++++++++++------ .../apache/phoenix/compile/DeleteCompiler.java | 6 +- .../apache/phoenix/compile/UpsertCompiler.java | 6 +- .../org/apache/phoenix/query/QueryServices.java | 3 + .../phoenix/query/QueryServicesOptions.java | 3 + 7 files changed, 159 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/f6b75942/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java index 5e65927..39210fa 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java @@ -40,12 +40,26 @@ import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; - +@RunWith(Parameterized.class) public class DeleteIT extends ParallelStatsDisabledIT { private static final int NUMBER_OF_ROWS = 20; private static final int NTH_ROW_NULL = 5; - + + private final String allowServerSideMutations; + + public DeleteIT(String allowServerSideMutations) { + this.allowServerSideMutations = allowServerSideMutations; + } + + @Parameters(name="DeleteIT_allowServerSideMutations={0}") // name is used by failsafe as file name in reports + public static Object[] data() { + return new Object[] {"true", "false"}; + } + private static String initTableValues(Connection conn) throws SQLException { String tableName = generateUniqueName(); ensureTableCreated(getUrl(), tableName, "IntIntKeyTest"); @@ -75,7 +89,9 @@ public class DeleteIT extends ParallelStatsDisabledIT { } private void testDeleteFilter(boolean autoCommit) throws Exception { - Connection conn = DriverManager.getConnection(getUrl()); + Properties props = new Properties(); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); + Connection conn = DriverManager.getConnection(getUrl(), props); String tableName = initTableValues(conn); assertTableCount(conn, tableName, NUMBER_OF_ROWS); @@ -102,7 +118,9 @@ public class DeleteIT extends ParallelStatsDisabledIT { } private void testDeleteByFilterAndRow(boolean autoCommit) throws SQLException { - Connection conn = DriverManager.getConnection(getUrl()); + Properties props = new Properties(); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); + Connection conn = DriverManager.getConnection(getUrl(), props); String tableName = initTableValues(conn); assertTableCount(conn, tableName, NUMBER_OF_ROWS); @@ -167,7 +185,9 @@ public class DeleteIT extends ParallelStatsDisabledIT { } private void testDeleteRange(boolean autoCommit, boolean createIndex, boolean local) throws Exception { - Connection conn = DriverManager.getConnection(getUrl()); + Properties props = new Properties(); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); + Connection conn = DriverManager.getConnection(getUrl(), props); String tableName = initTableValues(conn); String indexName = generateUniqueName(); String localIndexName = generateUniqueName(); @@ -298,7 +318,9 @@ public class DeleteIT extends ParallelStatsDisabledIT { private void testDeleteAllFromTableWithIndex(boolean autoCommit, boolean isSalted, boolean localIndex) throws Exception { Connection con = null; try { - con = DriverManager.getConnection(getUrl()); + Properties props = new Properties(); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); + con = DriverManager.getConnection(getUrl(), props); con.setAutoCommit(autoCommit); Statement stm = con.createStatement(); @@ -390,7 +412,9 @@ public class DeleteIT extends ParallelStatsDisabledIT { Connection con = null; try { boolean autoCommit = false; - con = DriverManager.getConnection(getUrl()); + Properties props = new Properties(); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); + con = DriverManager.getConnection(getUrl(), props); con.setAutoCommit(autoCommit); Statement stm = con.createStatement(); @@ -465,7 +489,9 @@ public class DeleteIT extends ParallelStatsDisabledIT { Connection con = null; try { boolean autoCommit = false; - con = DriverManager.getConnection(getUrl()); + Properties props = new Properties(); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); + con = DriverManager.getConnection(getUrl(), props); con.setAutoCommit(autoCommit); Statement stm = con.createStatement(); @@ -588,7 +614,9 @@ public class DeleteIT extends ParallelStatsDisabledIT { private void testDeleteAllFromTable(boolean autoCommit) throws SQLException { Connection con = null; try { - con = DriverManager.getConnection(getUrl()); + Properties props = new Properties(); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); + con = DriverManager.getConnection(getUrl(), props); con.setAutoCommit(autoCommit); String tableName = generateUniqueName(); @@ -649,7 +677,9 @@ public class DeleteIT extends ParallelStatsDisabledIT { } private void testDeleteForTableWithRowTimestampCol(boolean autoCommit, String tableName) throws Exception { - try (Connection conn = DriverManager.getConnection(getUrl())) { + Properties props = new Properties(); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { conn.setAutoCommit(autoCommit); Statement stm = conn.createStatement(); stm.execute("CREATE TABLE IF NOT EXISTS " + tableName + @@ -733,7 +763,9 @@ public class DeleteIT extends ParallelStatsDisabledIT { + "CREATE INDEX IF NOT EXISTS index_column_varchar_id ON " + tableName + "(varchar_id);" + "CREATE INDEX IF NOT EXISTS index_column_double_id ON " + tableName + "(double_id);" + "UPSERT INTO " + tableName + " VALUES (9000000,0.5,'Sample text extra');" ; - try (Connection conn = DriverManager.getConnection(getUrl())) { + Properties props = new Properties(); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { conn.setAutoCommit(true); Statement stm = conn.createStatement(); for (String sql : commands.split(";")) { @@ -755,7 +787,9 @@ public class DeleteIT extends ParallelStatsDisabledIT { String ddl = "CREATE TABLE IF NOT EXISTS " + tableName + " (pk1 DECIMAL NOT NULL, v1 VARCHAR CONSTRAINT PK PRIMARY KEY (pk1))"; int numRecords = 1010; - try (Connection conn = DriverManager.getConnection(getUrl())) { + Properties props = new Properties(); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { conn.createStatement().execute(ddl); Statement stmt = conn.createStatement(); for (int i = 0; i < numRecords ; i++) { @@ -788,7 +822,9 @@ public class DeleteIT extends ParallelStatsDisabledIT { + " (pk1 DECIMAL NOT NULL, v1 VARCHAR, v2 VARCHAR CONSTRAINT PK PRIMARY KEY (pk1))"; String idx1 = "CREATE INDEX " + indexName1 + " ON " + tableName + "(v1)"; String idx2 = "CREATE INDEX " + indexName2 + " ON " + tableName + "(v1, v2)"; - try (Connection conn = DriverManager.getConnection(getUrl())) { + Properties props = new Properties(); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { conn.createStatement().execute(ddl); conn.createStatement().execute(idx1); conn.createStatement().execute(idx2); http://git-wip-us.apache.org/repos/asf/phoenix/blob/f6b75942/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java index 6210852..c56296c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java @@ -23,35 +23,44 @@ import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import java.sql.Connection; import java.sql.Date; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; -import java.sql.SQLException; import java.sql.Statement; import java.util.Properties; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.TestUtil; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; - +@RunWith(Parameterized.class) public class UpsertSelectAutoCommitIT extends ParallelStatsDisabledIT { - public UpsertSelectAutoCommitIT() { + private final String allowServerSideMutations; + + public UpsertSelectAutoCommitIT(String allowServerSideMutations) { + this.allowServerSideMutations = allowServerSideMutations; + } + + @Parameters(name="UpsertSelectAutoCommitIT_allowServerSideMutations={0}") // name is used by failsafe as file name in reports + public static Object[] data() { + return new Object[] {"true", "false"}; } @Test public void testAutoCommitUpsertSelect() throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); Connection conn = DriverManager.getConnection(getUrl(), props); conn.setAutoCommit(true); String atable = generateUniqueName(); @@ -104,7 +113,9 @@ public class UpsertSelectAutoCommitIT extends ParallelStatsDisabledIT { @Test public void testDynamicUpsertSelect() throws Exception { - Connection conn = DriverManager.getConnection(getUrl()); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); + Connection conn = DriverManager.getConnection(getUrl(), props); String tableName = generateUniqueName(); String cursorDDL = " CREATE TABLE IF NOT EXISTS " + tableName + " (ORGANIZATION_ID VARCHAR(15) NOT NULL, \n" @@ -161,6 +172,7 @@ public class UpsertSelectAutoCommitIT extends ParallelStatsDisabledIT { props.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, Integer.toString(512)); props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3)); props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3)); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); Connection conn = DriverManager.getConnection(getUrl(), props); conn.setAutoCommit(true); conn.createStatement().execute("CREATE SEQUENCE keys CACHE 1000"); @@ -189,6 +201,7 @@ public class UpsertSelectAutoCommitIT extends ParallelStatsDisabledIT { Properties connectionProperties = new Properties(); connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "3"); connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "50000"); + connectionProperties.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); PhoenixConnection connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties); connection.setAutoCommit(true); @@ -214,6 +227,7 @@ public class UpsertSelectAutoCommitIT extends ParallelStatsDisabledIT { props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(3)); props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3)); props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3)); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); Connection conn = DriverManager.getConnection(getUrl(), props); conn.setAutoCommit(false); String tableName = generateUniqueName(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/f6b75942/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java index 5db1fdd..b5aa8be 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java @@ -58,8 +58,22 @@ import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.TestUtil; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +@RunWith(Parameterized.class) public class UpsertSelectIT extends ParallelStatsDisabledIT { + private final String allowServerSideMutations; + + public UpsertSelectIT(String allowServerSideMutations) { + this.allowServerSideMutations = allowServerSideMutations; + } + + @Parameters(name="UpsertSelecttIT_allowServerSideMutations={0}") // name is used by failsafe as file name in reports + public static Object[] data() { + return new Object[] {"true", "false"}; + } @Test public void testUpsertSelectWithNoIndex() throws Exception { @@ -85,6 +99,7 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { String tenantId = getOrganizationId(); byte[][] splits = getDefaultSplits(tenantId); Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); String aTable = initATableValues(tenantId, saltTable ? null : splits, null, null, getUrl(), saltTable ? "salt_buckets = 2" : null); String customEntityTable = generateUniqueName(); @@ -234,6 +249,7 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { String ptsdbTable = generateUniqueName(); ensureTableCreated(getUrl(), ptsdbTable, PTSDB_NAME); Properties props = new Properties(); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); Connection conn = DriverManager.getConnection(getUrl(), props); conn.setAutoCommit(false); String upsert = "UPSERT INTO " + ptsdbTable + "(\"DATE\", val, host) " + @@ -404,6 +420,7 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { String ptsdbTable = generateUniqueName(); ensureTableCreated(getUrl(), ptsdbTable, PTSDB_NAME); Properties props = new Properties(); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); Connection conn = DriverManager.getConnection(getUrl(), props); conn.setAutoCommit(autoCommit); String upsert = "UPSERT INTO " + ptsdbTable + "(\"DATE\", val, host) " + @@ -475,6 +492,7 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { String tableName = generateUniqueName(); ensureTableCreated(getUrl(), tableName, "IntKeyTest", splits, null); Properties props = new Properties(); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); Connection conn = DriverManager.getConnection(getUrl(), props); String upsert = "UPSERT INTO " + tableName + " VALUES(1)"; PreparedStatement upsertStmt = conn.prepareStatement(upsert); @@ -509,6 +527,7 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { String tableName = generateUniqueName(); createTestTable(getUrl(), "create table " + tableName + " (i integer not null primary key desc, j integer)", splits, null); Properties props = new Properties(); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); Connection conn = DriverManager.getConnection(getUrl(), props); String upsert = "UPSERT INTO " + tableName + " VALUES(1, 1)"; PreparedStatement upsertStmt = conn.prepareStatement(upsert); @@ -566,6 +585,7 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { String tableName = generateUniqueName(); createTestTable(getUrl(), "create table " + tableName + " (i integer not null primary key desc, j integer)", splits, null); Properties props = new Properties(); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); Connection conn = DriverManager.getConnection(getUrl(), props); String upsert = "UPSERT INTO " + tableName + " VALUES(1, 1)"; PreparedStatement upsertStmt = conn.prepareStatement(upsert); @@ -603,6 +623,7 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { String tableName = generateUniqueName(); ensureTableCreated(getUrl(), tableName, "IntKeyTest", splits, null, null); Properties props = new Properties(); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); Connection conn = DriverManager.getConnection(getUrl(), props); String upsert = "UPSERT INTO " + tableName + " VALUES(?)"; PreparedStatement upsertStmt = conn.prepareStatement(upsert); @@ -640,6 +661,7 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { @Test public void testUpsertSelectWithLimit() throws Exception { Properties props = new Properties(); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); Connection conn = DriverManager.getConnection(getUrl(), props); String tableName = generateUniqueName(); conn.createStatement().execute("create table " + tableName + " (id varchar(10) not null primary key, val varchar(10), ts timestamp)"); @@ -703,6 +725,7 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { @Test public void testUpsertSelectWithSequence() throws Exception { Properties props = new Properties(); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); Connection conn = DriverManager.getConnection(getUrl(), props); String t1 = generateUniqueName(); String t2 = generateUniqueName(); @@ -745,6 +768,7 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { public void testUpsertSelectWithSequenceAndOrderByWithSalting() throws Exception { int numOfRecords = 200; Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); Connection conn = DriverManager.getConnection(getUrl(), props); String t1 = generateUniqueName(); String t2 = generateUniqueName(); @@ -788,6 +812,8 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { assertEquals(seq - 1, rs2.getLong("k1")); seq++; } + // cleanup afrer ourselves + conn.createStatement().execute("drop sequence s"); conn.close(); } @@ -806,7 +832,9 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { // The timestamp of the put will be the value of the row_timestamp column. long rowTimestamp = EnvironmentEdgeManager.currentTimeMillis(); Date rowTimestampDate = new Date(rowTimestamp); - try (Connection conn = DriverManager.getConnection(getUrl())) { + Properties props = new Properties(); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + t1 + " (PK1, PK2, KV1) VALUES(?, ?, ?)"); stmt.setString(1, "PK1"); stmt.setDate(2, rowTimestampDate); @@ -833,7 +861,6 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { } // Verify that you can't see the data in T2 if the connection is at a timestamp lower than the row timestamp. - Properties props = new Properties(); props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(rowTimestamp-1)); try (Connection conn = DriverManager.getConnection(getUrl(), props)) { PreparedStatement stmt = conn.prepareStatement("SELECT * FROM " + t2 + " WHERE PK1 = ? AND PK2 = ?"); @@ -873,14 +900,16 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { @Test public void testUpsertSelectSameTableWithRowTimestampColumn() throws Exception { String tableName = generateUniqueName(); - try (Connection conn = DriverManager.getConnection(getUrl())) { + Properties props = new Properties(); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { conn.createStatement().execute("CREATE TABLE " + tableName + " (PK1 INTEGER NOT NULL, PK2 DATE NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 ROW_TIMESTAMP)) "); } // The timestamp of the put will be the value of the row_timestamp column. long rowTimestamp = 100; Date rowTimestampDate = new Date(rowTimestamp); - try (Connection conn = DriverManager.getConnection(getUrl())) { + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (PK1, PK2, KV1) VALUES(?, ?, ?)"); stmt.setInt(1, 1); stmt.setDate(2, rowTimestampDate); @@ -889,18 +918,18 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { conn.commit(); } String seq = generateUniqueName(); - try (Connection conn = DriverManager.getConnection(getUrl())) { + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { conn.createStatement().execute("CREATE SEQUENCE " + seq); } // Upsert select data into table. The connection needs to be at a timestamp beyond the row timestamp. Otherwise // it won't see the data from table. - try (Connection conn = DriverManager.getConnection(getUrl())) { + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { conn.createStatement().executeUpdate("UPSERT INTO " + tableName + " SELECT NEXT VALUE FOR " + seq + ", PK2 FROM " + tableName); conn.commit(); } // Upsert select using sequences. - try (Connection conn = DriverManager.getConnection(getUrl())) { + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { conn.setAutoCommit(true); for (int i = 0; i < 10; i++) { int count = conn.createStatement().executeUpdate("UPSERT INTO " + tableName + " SELECT NEXT VALUE FOR " + seq + ", PK2 FROM " + tableName); @@ -920,7 +949,9 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { conn.createStatement().execute("CREATE TABLE " + table3 + " (T3PK1 VARCHAR NOT NULL, T3PK2 DATE NOT NULL, T3KV1 VARCHAR, T3KV2 VARCHAR CONSTRAINT PK PRIMARY KEY(T3PK1, T3PK2 DESC ROW_TIMESTAMP)) "); } long startTime = EnvironmentEdgeManager.currentTimeMillis(); - try (Connection conn = DriverManager.getConnection(getUrl())) { + Properties props = new Properties(); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { // Upsert values where row_timestamp column PK2 is not set and the column names are specified // This should upsert data with the value for PK2 as server timestamp PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + table1 + " (T1PK1, T1KV1, T1KV2) VALUES (?, ?, ?)"); @@ -932,7 +963,7 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { } long endTime = EnvironmentEdgeManager.currentTimeMillis(); - try (Connection conn = DriverManager.getConnection(getUrl())) { + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { // Now query for data that was upserted above. If the row key was generated correctly then we should be able to see // the data in this query. PreparedStatement stmt = conn.prepareStatement("SELECT T1KV1, T1KV2 FROM " + table1 + " WHERE T1PK1 = ? AND T1PK2 >= ? AND T1PK2 <= ?"); @@ -947,7 +978,7 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { } startTime = EnvironmentEdgeManager.currentTimeMillis(); - try (Connection conn = DriverManager.getConnection(getUrl())) { + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { // Upsert select into table2 by not selecting the row timestamp column. In this case, the rowtimestamp column would end up being set to the server timestamp PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + table2 + " (T2PK1, T2KV1, T2KV2) SELECT T1PK1, T1KV1, T1KV2 FROM " + table1); stmt.executeUpdate(); @@ -955,7 +986,7 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { } endTime = EnvironmentEdgeManager.currentTimeMillis(); - try (Connection conn = DriverManager.getConnection(getUrl())) { + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { // Now query for data that was upserted above. If the row key was generated correctly then we should be able to see // the data in this query. PreparedStatement stmt = conn.prepareStatement("SELECT T2KV1, T2KV2 FROM " + table2 + " WHERE T2PK1 = ? AND T2PK2 >= ? AND T2PK2 <= ?"); @@ -970,7 +1001,7 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { } startTime = EnvironmentEdgeManager.currentTimeMillis(); - try (Connection conn = DriverManager.getConnection(getUrl())) { + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { // Upsert select into table3 by not selecting the row timestamp column. In this case, the rowtimestamp column would end up being set to the server timestamp PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + table3 + " (T3PK1, T3KV1, T3KV2) SELECT T2PK1, T2KV1, T2KV2 FROM " + table2); stmt.executeUpdate(); @@ -978,7 +1009,7 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { } endTime = EnvironmentEdgeManager.currentTimeMillis(); - try (Connection conn = DriverManager.getConnection(getUrl())) { + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { // Now query for data that was upserted above. If the row key was generated correctly then we should be able to see // the data in this query. PreparedStatement stmt = conn.prepareStatement("SELECT T3KV1, T3KV2 FROM " + table3 + " WHERE T3PK1 = ? AND T3PK2 >= ? AND T3PK2 <= ?"); @@ -997,7 +1028,9 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { public void testUpsertSelectAutoCommitWithRowTimestampColumn() throws Exception { String tableName1 = generateUniqueName(); String tableName2 = generateUniqueName(); - try (Connection conn = DriverManager.getConnection(getUrl())) { + Properties props = new Properties(); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { conn.createStatement().execute("CREATE TABLE " + tableName1 + " (PK1 INTEGER NOT NULL, PK2 DATE NOT NULL, PK3 INTEGER NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 ROW_TIMESTAMP, PK3)) "); conn.createStatement().execute("CREATE TABLE " + tableName2 + " (PK1 INTEGER NOT NULL, PK2 DATE NOT NULL, PK3 INTEGER NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 DESC ROW_TIMESTAMP, PK3)) "); } @@ -1007,7 +1040,7 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { // Upsert data with the row timestamp value set long rowTimestamp1 = 100; Date rowTimestampDate = new Date(rowTimestamp1); - try (Connection conn = DriverManager.getConnection(getUrl())) { + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (PK1, PK2, PK3, KV1) VALUES(?, ?, ?, ?)"); stmt.setInt(1, 1); stmt.setDate(2, rowTimestampDate); @@ -1018,7 +1051,7 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { } long startTime = EnvironmentEdgeManager.currentTimeMillis(); - try (Connection conn = DriverManager.getConnection(getUrl())) { + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { conn.setAutoCommit(true); // Upsert select in the same table with the row_timestamp column PK2 not specified. // This will end up creating a new row whose timestamp is the server time stamp @@ -1027,7 +1060,7 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { } long endTime = EnvironmentEdgeManager.currentTimeMillis(); - try (Connection conn = DriverManager.getConnection(getUrl())) { + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { // Verify the row that was upserted above PreparedStatement stmt = conn.prepareStatement("SELECT * FROM " + tableName + " WHERE PK1 = ? AND PK2 >= ? AND PK2<= ? AND PK3 = ?"); stmt.setInt(1, 1); @@ -1046,13 +1079,13 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { assertEquals(2, rs.getInt(1)); } - try (Connection conn = DriverManager.getConnection(getUrl())) { + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { conn.setAutoCommit(true); // Upsert select in the same table with the row_timestamp column PK2 specified. This will not end up creating a new row // because the destination pk columns, including the row timestamp column PK2, are the same as the source column. conn.createStatement().executeUpdate("UPSERT INTO " + tableName + " (PK1, PK2, PK3, KV1) SELECT PK1, PK2, PK3, KV1 FROM " + tableName); } - try (Connection conn = DriverManager.getConnection(getUrl())) { + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { // Verify that two rows were created. One with rowtimestamp1 and the other with rowtimestamp2 ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + tableName); assertTrue(rs.next()); @@ -1071,7 +1104,9 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { String baseTableIdx = generateUniqueName(); String tenantViewIdx = generateUniqueName(); - try (Connection conn = DriverManager.getConnection(getUrl())) { + Properties props = new Properties(); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { conn.createStatement().execute("CREATE IMMUTABLE TABLE " + baseTable + " (TENANT_ID CHAR(15) NOT NULL, PK2 DATE NOT NULL, PK3 INTEGER NOT NULL, KV1 VARCHAR, KV2 VARCHAR, KV3 VARCHAR CONSTRAINT PK PRIMARY KEY(TENANT_ID, PK2 ROW_TIMESTAMP, PK3)) MULTI_TENANT = true, SALT_BUCKETS = 8"); conn.createStatement().execute("CREATE INDEX " + baseTableIdx + " ON " + baseTable + " (PK2, KV3) INCLUDE (KV1)"); conn.createStatement().execute("CREATE VIEW " + globalView + " AS SELECT * FROM " + baseTable + " WHERE KV1 = 'KV1'"); @@ -1085,7 +1120,7 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { // upsert data into base table without specifying the row timestamp column PK2 long startTime = EnvironmentEdgeManager.currentTimeMillis(); - try (Connection conn = DriverManager.getConnection(getUrl())) { + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { // Upsert select in the same table with the row_timestamp column PK2 not specified. This will end up // creating a new row whose timestamp is the latest timestamp (which will be used // for the row key too) @@ -1101,7 +1136,7 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { long endTime = EnvironmentEdgeManager.currentTimeMillis(); // Verify that we can see data when querying through base table, global view and index on the base table - try (Connection conn = DriverManager.getConnection(getUrl())) { + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { // Query the base table PreparedStatement stmt = conn.prepareStatement("SELECT * FROM " + baseTable + " WHERE TENANT_ID = ? AND PK2 >= ? AND PK2 <= ? AND PK3 = ?"); stmt.setString(1, tenantId); @@ -1195,7 +1230,7 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { // Verify that the data upserted using the tenant view can now be queried using base table and the base table index Date upsertedDate; - try (Connection conn = DriverManager.getConnection(getUrl())) { + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { // Query the base table PreparedStatement stmt = conn.prepareStatement("SELECT * FROM " + baseTable + " WHERE TENANT_ID = ? AND PK2 >= ? AND PK2 <= ? AND PK3 = ? "); stmt.setString(1, tenantId); @@ -1276,11 +1311,13 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { public void testDisallowNegativeValuesForRowTsColumn() throws Exception { String tableName = generateUniqueName(); String tableName2 = generateUniqueName(); - try (Connection conn = DriverManager.getConnection(getUrl())) { + Properties props = new Properties(); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { conn.createStatement().execute("CREATE TABLE " + tableName + " (PK1 BIGINT NOT NULL PRIMARY KEY ROW_TIMESTAMP, KV1 VARCHAR)"); conn.createStatement().execute("CREATE TABLE " + tableName2 + " (PK1 BIGINT NOT NULL PRIMARY KEY ROW_TIMESTAMP, KV1 VARCHAR)"); } - try (Connection conn = DriverManager.getConnection(getUrl())) { + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { long upsertedTs = 100; PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES (?, ?)"); stmt.setLong(1, upsertedTs); @@ -1288,7 +1325,7 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { stmt.executeUpdate(); conn.commit(); } - try (Connection conn = DriverManager.getConnection(getUrl())) { + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName2 + " SELECT (PK1 - 500), KV1 FROM " + tableName); stmt.executeUpdate(); fail(); @@ -1300,6 +1337,7 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { @Test public void testUpsertSelectWithFixedWidthNullByteSizeArray() throws Exception { Properties props = new Properties(); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); Connection conn = DriverManager.getConnection(getUrl(), props); String t1 = generateUniqueName(); conn.createStatement().execute( @@ -1357,6 +1395,7 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { private void testUpsertSelectWithMultiByteChars(boolean autoCommit) throws Exception { Properties props = new Properties(); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); Connection conn = DriverManager.getConnection(getUrl(), props); conn.setAutoCommit(autoCommit); String t1 = generateUniqueName(); @@ -1401,6 +1440,7 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { props.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, Integer.toString(512)); props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3)); props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3)); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); Connection conn = DriverManager.getConnection(getUrl(), props); conn.setAutoCommit(false); String t1 = generateUniqueName(); @@ -1428,7 +1468,9 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { public void testLongCodecUsedForRowTimestamp() throws Exception { String tableName = generateUniqueName(); String indexName = generateUniqueName(); - try (Connection conn = DriverManager.getConnection(getUrl())) { + Properties props = new Properties(); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { conn.createStatement().execute("CREATE IMMUTABLE TABLE " + tableName + " (k1 TIMESTAMP not null, k2 bigint not null, v bigint, constraint pk primary key (k1 row_timestamp, k2)) SALT_BUCKETS = 9"); conn.createStatement().execute( @@ -1492,7 +1534,9 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { public void testLengthLimitedVarchar() throws Exception { String tableName1 = generateUniqueName(); String tableName2 = generateUniqueName(); - try (Connection conn = DriverManager.getConnection(getUrl())) { + Properties props = new Properties(); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { conn.setAutoCommit(true); conn.createStatement().execute("create table " + tableName1 + "(name varchar(160) primary key, id varchar(120), address varchar(160))"); conn.createStatement().execute("create table " + tableName2 + "(name varchar(160) primary key, id varchar(10), address varchar(10))"); @@ -1507,8 +1551,9 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { } } - private static Connection getTenantConnection(String tenantId) throws Exception { + private Connection getTenantConnection(String tenantId) throws Exception { Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, allowServerSideMutations); props.setProperty(TENANT_ID_ATTRIB, tenantId); return DriverManager.getConnection(getUrl(), props); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f6b75942/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index 8c9a930..ed21374 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -437,6 +437,9 @@ public class DeleteCompiler { final boolean hasPostProcessing = delete.getLimit() != null; final ConnectionQueryServices services = connection.getQueryServices(); List<QueryPlan> queryPlans; + boolean allowServerMutations = + services.getProps().getBoolean(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, + QueryServicesOptions.DEFAULT_ENABLE_SERVER_SIDE_MUTATIONS); NamedTableNode tableNode = delete.getTable(); String tableName = tableNode.getName().getTableName(); String schemaName = tableNode.getName().getSchemaName(); @@ -550,7 +553,8 @@ public class DeleteCompiler { } runOnServer &= queryPlans.get(0).getTableRef().getTable().getType() != PTableType.INDEX; - + runOnServer &= allowServerMutations; + // We need to have all indexed columns available in all immutable indexes in order // to generate the delete markers from the query. We also cannot have any filters // except for our SkipScanFilter for point lookups. http://git-wip-us.apache.org/repos/asf/phoenix/blob/f6b75942/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 4ed0c9a..410ac22 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -353,6 +353,9 @@ public class UpsertCompiler { boolean serverUpsertSelectEnabled = services.getProps().getBoolean(QueryServices.ENABLE_SERVER_UPSERT_SELECT, QueryServicesOptions.DEFAULT_ENABLE_SERVER_UPSERT_SELECT); + boolean allowServerMutations = + services.getProps().getBoolean(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, + QueryServicesOptions.DEFAULT_ENABLE_SERVER_SIDE_MUTATIONS); UpsertingParallelIteratorFactory parallelIteratorFactoryToBe = null; boolean useServerTimestampToBe = false; @@ -549,6 +552,7 @@ public class UpsertCompiler { && !(table.isImmutableRows() && !table.getIndexes().isEmpty()) && !select.isJoin() && !hasWhereSubquery && table.getRowTimestampColPos() == -1; } + runOnServer &= allowServerMutations; // If we may be able to run on the server, add a hint that favors using the data table // if all else is equal. // TODO: it'd be nice if we could figure out in advance if the PK is potentially changing, @@ -1322,7 +1326,7 @@ public class UpsertCompiler { public MutationState execute() throws SQLException { ResultIterator iterator = queryPlan.iterator(); if (parallelIteratorFactory == null) { - return upsertSelect(new StatementContext(statement), tableRef, projector, iterator, columnIndexes, pkSlotIndexes, useServerTimestamp, false); + return upsertSelect(new StatementContext(statement, queryPlan.getContext().getScan()), tableRef, projector, iterator, columnIndexes, pkSlotIndexes, useServerTimestamp, false); } try { parallelIteratorFactory.setRowProjector(projector); http://git-wip-us.apache.org/repos/asf/phoenix/blob/f6b75942/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index a8bbc22..78b72a1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -307,6 +307,9 @@ public interface QueryServices extends SQLCloseable { // whether to enable server side RS -> RS calls for upsert select statements public static final String ENABLE_SERVER_UPSERT_SELECT ="phoenix.client.enable.server.upsert.select"; + // whether to trigger mutations on the server at all (UPSERT/DELETE or DELETE FROM) + public static final String ENABLE_SERVER_SIDE_MUTATIONS ="phoenix.client.enable.server.mutations"; + //Update Cache Frequency default config attribute public static final String DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB = "phoenix.default.update.cache.frequency"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/f6b75942/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index d2f5bce..4e507d2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -362,6 +362,9 @@ public class QueryServicesOptions { // RS -> RS calls for upsert select statements are disabled by default public static final boolean DEFAULT_ENABLE_SERVER_UPSERT_SELECT = false; + // By default generally allow server trigger mutations + public static final boolean DEFAULT_ENABLE_SERVER_SIDE_MUTATIONS = true; + public static final boolean DEFAULT_COST_BASED_OPTIMIZER_ENABLED = false; public static final String DEFAULT_LOGGING_LEVEL = LogLevel.OFF.name(); public static final String DEFAULT_LOG_SAMPLE_RATE = "1.0";