This is an automated email from the ASF dual-hosted git repository. tkhurana pushed a commit to branch 5.1 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.1 by this push: new 3ee1470633 PHOENIX-6821: Optimize batching in auto-commit mode (#1587) 3ee1470633 is described below commit 3ee1470633bdc214c19616c01441436019a79e5e Author: Hari Krishna Dara <harid...@gmail.com> AuthorDate: Tue Apr 4 04:43:55 2023 +0530 PHOENIX-6821: Optimize batching in auto-commit mode (#1587) * PHOENIX-6821: Optimize batching in auto-commit mode Squash of the change from PR https://github.com/apache/phoenix/pull/1570 * Fix a merge issue with cherry-pick * Additional changes needed to fix compilation errors for 5.1 branch * Fix checkstyle errors * Fix checkstyle errors * Remove extraneous whitespace --- .../org/apache/phoenix/end2end/UpsertValuesIT.java | 182 ++++++++++++++++++--- .../phoenix/exception/BatchUpdateExecution.java | 36 ---- .../apache/phoenix/exception/SQLExceptionCode.java | 21 ++- .../org/apache/phoenix/execute/MutationState.java | 3 + .../phoenix/jdbc/PhoenixPreparedStatement.java | 26 +-- .../org/apache/phoenix/jdbc/PhoenixStatement.java | 39 +++-- .../phoenix/jdbc/PhoenixPreparedStatementTest.java | 93 ----------- ...tatementTest.java => PhoenixStatementTest.java} | 141 ++++++++++------ 8 files changed, 307 insertions(+), 234 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java index 0fb46b1abe..408e7ca0b0 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java @@ -17,16 +17,19 @@ */ package org.apache.phoenix.end2end; +import static org.apache.phoenix.util.PhoenixRuntime.REQUEST_METRIC_ATTRIB; import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB; import static org.apache.phoenix.util.TestUtil.closeStatement; import static org.apache.phoenix.util.TestUtil.closeStmtAndConn; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.math.BigDecimal; import java.math.RoundingMode; +import java.sql.BatchUpdateException; import java.sql.Connection; import java.sql.Date; import java.sql.DriverManager; @@ -36,6 +39,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.sql.Time; import java.sql.Timestamp; +import java.util.Map; import java.util.Properties; import org.apache.hadoop.hbase.client.Result; @@ -45,6 +49,8 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.monitoring.MetricType; +import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.SequenceNotFoundException; import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.util.DateUtil; @@ -55,6 +61,7 @@ import org.apache.phoenix.util.PropertiesUtil; import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.function.ThrowingRunnable; @Category(ParallelStatsDisabledTest.class) @@ -454,23 +461,25 @@ public class UpsertValuesIT extends ParallelStatsDisabledIT { closeStmtAndConn(stmt, conn); } } - - - @Test - public void testBatchedUpsert() throws Exception { + + private void testBatchedUpsert(boolean autocommit) throws Exception { String tableName = generateUniqueName(); Properties props = new Properties(); + props.setProperty(REQUEST_METRIC_ATTRIB, "true"); + props.setProperty(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, "true"); Connection conn = null; PreparedStatement pstmt = null; + Statement stmt = null; try { conn = DriverManager.getConnection(getUrl(), props); conn.createStatement().execute("create table " + tableName + " (k varchar primary key, v integer)"); } finally { closeStmtAndConn(pstmt, conn); } - + try { conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(autocommit); pstmt = conn.prepareStatement("upsert into " + tableName + " values (?, ?)"); pstmt.setString(1, "a"); pstmt.setInt(2, 1); @@ -478,12 +487,21 @@ public class UpsertValuesIT extends ParallelStatsDisabledIT { pstmt.setString(1, "b"); pstmt.setInt(2, 2); pstmt.addBatch(); + pstmt.setString(1, "c"); + pstmt.setInt(2, 3); + pstmt.addBatch(); pstmt.executeBatch(); - conn.commit(); + if (!autocommit) { + conn.commit(); + } + PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class); + Map<String, Map<MetricType, Long>> mutationMetrics = pConn.getMutationMetrics(); + Assert.assertEquals(3, (long) mutationMetrics.get(tableName).get(MetricType.MUTATION_BATCH_SIZE)); + Assert.assertEquals(autocommit, conn.getAutoCommit()); } finally { - closeStmtAndConn(pstmt, conn); + closeStmtAndConn(pstmt, conn); } - + try { conn = DriverManager.getConnection(getUrl(), props); pstmt = conn.prepareStatement("select * from " + tableName); @@ -494,49 +512,161 @@ public class UpsertValuesIT extends ParallelStatsDisabledIT { assertTrue(rs.next()); assertEquals("b", rs.getString(1)); assertEquals(2, rs.getInt(2)); + assertTrue(rs.next()); + assertEquals("c", rs.getString(1)); + assertEquals(3, rs.getInt(2)); assertFalse(rs.next()); } finally { - closeStmtAndConn(pstmt, conn); + closeStmtAndConn(pstmt, conn); } - - conn = DriverManager.getConnection(getUrl(), props); - Statement stmt = conn.createStatement(); + try { - stmt.addBatch("upsert into " + tableName + " values ('c', 3)"); - stmt.addBatch("select count(*) from " + tableName); - stmt.addBatch("upsert into " + tableName + " values ('a', 4)"); + conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(autocommit); + stmt = conn.createStatement(); + stmt.addBatch("upsert into " + tableName + " values ('d', 4)"); + stmt.addBatch("upsert into " + tableName + " values ('a', 5)"); ResultSet rs = stmt.executeQuery("select count(*) from " + tableName); assertTrue(rs.next()); - assertEquals(2, rs.getInt(1)); + assertEquals(3, rs.getInt(1)); int[] result = stmt.executeBatch(); - assertEquals(3,result.length); + assertEquals(2, result.length); assertEquals(result[0], 1); - assertEquals(result[1], -2); - assertEquals(result[2], 1); + assertEquals(result[1], 1); conn.commit(); } finally { - closeStmtAndConn(pstmt, conn); + closeStmtAndConn(stmt, conn); } - + try { conn = DriverManager.getConnection(getUrl(), props); - pstmt = conn.prepareStatement("select * from " + tableName); - ResultSet rs = pstmt.executeQuery(); + stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery("select * from " + tableName); assertTrue(rs.next()); assertEquals("a", rs.getString(1)); - assertEquals(4, rs.getInt(2)); + assertEquals(5, rs.getInt(2)); assertTrue(rs.next()); assertEquals("b", rs.getString(1)); assertEquals(2, rs.getInt(2)); assertTrue(rs.next()); assertEquals("c", rs.getString(1)); assertEquals(3, rs.getInt(2)); + assertTrue(rs.next()); + assertEquals("d", rs.getString(1)); + assertEquals(4, rs.getInt(2)); assertFalse(rs.next()); } finally { - closeStmtAndConn(pstmt, conn); + closeStmtAndConn(stmt, conn); } + + try { + conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(autocommit); + stmt = conn.createStatement(); + stmt.addBatch("delete from " + tableName + " where v <= 4"); + stmt.addBatch("delete from " + tableName + " where v = 5"); + int[] result = stmt.executeBatch(); + assertEquals(2, result.length); + assertEquals(result[0], 3); + assertEquals(result[1], 1); + conn.commit(); + } finally { + closeStmtAndConn(stmt, conn); + } + try { + conn = DriverManager.getConnection(getUrl(), props); + pstmt = conn.prepareStatement("select count(*) from " + tableName); + ResultSet rs = pstmt.executeQuery(); + assertTrue(rs.next()); + assertEquals(0, rs.getInt(1)); + } finally { + closeStmtAndConn(stmt, conn); + } + } - + + @Test + public void testBatchedUpsert() throws Exception { + testBatchedUpsert(false); + } + + @Test + public void testBatchedUpsertAutoCommit() throws Exception { + testBatchedUpsert(true); + } + + @Test + public void testBatchedUpsertMultipleBatches() throws Exception { + String tableName = generateUniqueName(); + Properties props = new Properties(); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute("create table " + tableName + " (k varchar primary key, v integer)"); + PreparedStatement pstmt = conn.prepareStatement("upsert into " + tableName + " values (?, ?)"); + pstmt.setString(1, "a"); + pstmt.setInt(2, 1); + pstmt.addBatch(); + pstmt.executeBatch(); + pstmt.setString(1, "b"); + pstmt.setInt(2, 2); + pstmt.addBatch(); + pstmt.executeBatch(); + conn.commit(); + } + + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery("select count(*) from " + tableName); + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + } + } + + private void testBatchRollback(boolean autocommit) throws Exception { + String tableName = generateUniqueName(); + Properties props = new Properties(); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute("create table " + tableName + " (k varchar primary key, v integer)"); + conn.setAutoCommit(autocommit); + PreparedStatement pstmt = conn.prepareStatement("upsert into " + tableName + " values (?, ?)"); + pstmt.setString(1, "a"); + pstmt.setInt(2, 1); + pstmt.addBatch(); + pstmt.executeBatch(); + conn.rollback(); + } + + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery("select count(*) from " + tableName); + assertTrue(rs.next()); + assertEquals(autocommit ? 1 : 0, rs.getInt(1)); + } + } + + @Test + public void testBatchRollback() throws Exception { + testBatchRollback(false); + } + + @Test + public void testBatchNoRollbackWithAutoCommit() throws Exception { + testBatchRollback(true); + } + + @Test + public void testDQLFailsInBatch() throws Exception { + String tableName = generateUniqueName(); + Properties props = new Properties(); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute("create table " + tableName + " (k varchar primary key, v integer)"); + Statement stmt = conn.createStatement(); + stmt.addBatch("select * from " + tableName); + BatchUpdateException ex = assertThrows(BatchUpdateException.class, () -> stmt.executeBatch()); + assertEquals("java.sql.BatchUpdateException: ERROR 1151 (XCL51): A batch operation can't include a statement that produces result sets.", + ex.getMessage()); + } + } + private static Date toDate(String dateString) { return DateUtil.parseDate(dateString); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/BatchUpdateExecution.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/BatchUpdateExecution.java deleted file mode 100644 index d3cd82b54a..0000000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/BatchUpdateExecution.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.exception; - -import java.sql.SQLException; - -public class BatchUpdateExecution extends SQLException { - private static final long serialVersionUID = 1L; - private static SQLExceptionCode code = SQLExceptionCode.BATCH_EXCEPTION; - private final int batchIndex; - - public BatchUpdateExecution(Throwable cause, int batchIndex) { - super(new SQLExceptionInfo.Builder(code).build().toString(), - code.getSQLState(), code.getErrorCode(), cause); - this.batchIndex = batchIndex; - } - - public int getBatchIndex() { - return batchIndex; - } -} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index 3907ebd555..26b17e6964 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.exception; +import java.sql.BatchUpdateException; import java.sql.SQLException; import java.sql.SQLTimeoutException; import java.util.Map; @@ -444,6 +445,8 @@ public enum SQLExceptionCode { "Duplicate ENCODED_QUALIFIER."), MISSING_CQ(1150, "XCL49", "Missing ENCODED_QUALIFIER."), + EXECUTE_BATCH_FOR_STMT_WITH_RESULT_SET(1151, "XCL51", "A batch operation can't include a " + + "statement that produces result sets.", Factory.BATCH_UPDATE_ERROR), /** @@ -611,15 +614,16 @@ public enum SQLExceptionCode { } public static interface Factory { - public static final Factory DEFAULT = new Factory() { + Factory DEFAULT = new Factory() { @Override public SQLException newException(SQLExceptionInfo info) { - return new SQLException(info.toString(), info.getCode().getSQLState(), info.getCode().getErrorCode(), info.getRootCause()); + return new SQLException(info.toString(), info.getCode().getSQLState(), + info.getCode().getErrorCode(), info.getRootCause()); } }; - public static final Factory SYNTAX_ERROR = new Factory() { + Factory SYNTAX_ERROR = new Factory() { @Override public SQLException newException(SQLExceptionInfo info) { @@ -627,7 +631,16 @@ public enum SQLExceptionCode { } }; - public SQLException newException(SQLExceptionInfo info); + Factory BATCH_UPDATE_ERROR = new Factory() { + + @Override + public SQLException newException(SQLExceptionInfo info) { + return new BatchUpdateException(info.toString(), info.getCode().getSQLState(), + info.getCode().getErrorCode(), (int[]) null, info.getRootCause()); + } + + }; + SQLException newException(SQLExceptionInfo info); } private static final Map<Integer,SQLExceptionCode> errorCodeMap = Maps.newHashMapWithExpectedSize(SQLExceptionCode.values().length); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index e32f5a6e76..04cb379c42 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -1946,4 +1946,7 @@ public class MutationState implements SQLCloseable { return mutationMetricQueue; } + public boolean isEmpty() { + return mutationsMap != null ? mutationsMap.isEmpty() : true; + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java index d955777cd8..be0324e0bb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java @@ -160,11 +160,23 @@ public class PhoenixPreparedStatement extends PhoenixStatement implements Prepar return compileMutation(statement, query); } - boolean execute(boolean batched) throws SQLException { + void executeForBatch() throws SQLException { throwIfUnboundParameters(); - if (!batched && statement.getOperation().isMutation() && !batch.isEmpty()) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH) - .build().buildException(); + if (!statement.getOperation().isMutation()) { + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.EXECUTE_BATCH_FOR_STMT_WITH_RESULT_SET) + .build().buildException(); + } + executeMutation(statement, createAuditQueryLogger(statement, query)); + } + + @Override + public boolean execute() throws SQLException { + throwIfUnboundParameters(); + if (statement.getOperation().isMutation() && !batch.isEmpty()) { + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH) + .build().buildException(); } if (statement.getOperation().isMutation()) { executeMutation(statement, createAuditQueryLogger(statement,query)); @@ -172,12 +184,6 @@ public class PhoenixPreparedStatement extends PhoenixStatement implements Prepar } executeQuery(statement, createQueryLogger(statement,query)); return true; - - } - - @Override - public boolean execute() throws SQLException { - return execute(false); } @Override diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 5562c8d5c5..64e4166171 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -24,6 +24,7 @@ import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SELECT_SQ import java.io.File; import java.io.IOException; import java.io.Reader; +import java.sql.BatchUpdateException; import java.sql.ParameterMetaData; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -78,7 +79,6 @@ import org.apache.phoenix.compile.StatementPlan; import org.apache.phoenix.compile.TraceQueryPlan; import org.apache.phoenix.compile.UpsertCompiler; import org.apache.phoenix.coprocessor.MetaDataProtocol; -import org.apache.phoenix.exception.BatchUpdateExecution; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.exception.UpgradeRequiredException; @@ -456,7 +456,7 @@ public class PhoenixStatement implements Statement, SQLCloseable { setLastUpdateCount(lastUpdateCount); setLastUpdateOperation(stmt.getOperation()); connection.incrementStatementExecutionCounter(); - if(queryLogger.isAuditLoggingEnabled()) { + if (queryLogger.isAuditLoggingEnabled()) { queryLogger.log(QueryLogInfo.TABLE_NAME_I, getTargetForAudit(stmt)); queryLogger.log(QueryLogInfo.QUERY_STATUS_I, QueryStatus.COMPLETED.toString()); queryLogger.log(QueryLogInfo.NO_OF_RESULTS_ITERATED_I, lastUpdateCount); @@ -489,7 +489,7 @@ public class PhoenixStatement implements Statement, SQLCloseable { }, PhoenixContextExecutor.inContext(), Tracing.withTracing(connection, this.toString())); } catch (Exception e) { - if(queryLogger.isAuditLoggingEnabled()) { + if (queryLogger.isAuditLoggingEnabled()) { queryLogger.log(QueryLogInfo.TABLE_NAME_I, getTargetForAudit(stmt)); queryLogger.log(QueryLogInfo.EXCEPTION_TRACE_I, Throwables.getStackTraceAsString(e)); queryLogger.log(QueryLogInfo.QUERY_STATUS_I, QueryStatus.FAILED.toString()); @@ -861,7 +861,7 @@ public class PhoenixStatement implements Statement, SQLCloseable { @SuppressWarnings("unchecked") @Override public MutationPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException { - if(!getUdfParseNodes().isEmpty()) { + if (!getUdfParseNodes().isEmpty()) { stmt.throwIfUnallowedUserDefinedFunctions(getUdfParseNodes()); } DeleteCompiler compiler = new DeleteCompiler(stmt, this.getOperation()); @@ -1829,26 +1829,41 @@ public class PhoenixStatement implements Statement, SQLCloseable { /** * Execute the current batch of statements. If any exception occurs - * during execution, a org.apache.phoenix.exception.BatchUpdateException - * is thrown which includes the index of the statement within the - * batch when the exception occurred. + * during execution, a {@link java.sql.BatchUpdateException} + * is thrown which compposes the update counts for statements executed so + * far. */ @Override public int[] executeBatch() throws SQLException { int i = 0; + int[] returnCodes = new int [batch.size()]; + Arrays.fill(returnCodes, -1); + boolean autoCommit = connection.getAutoCommit(); + connection.setAutoCommit(false); try { - int[] returnCodes = new int [batch.size()]; for (i = 0; i < returnCodes.length; i++) { PhoenixPreparedStatement statement = batch.get(i); - returnCodes[i] = statement.execute(true) ? Statement.SUCCESS_NO_INFO : statement.getUpdateCount(); + statement.executeForBatch(); + returnCodes[i] = statement.getUpdateCount(); } // Flush all changes in batch if auto flush is true flushIfNecessary(); // If we make it all the way through, clear the batch clearBatch(); + if (autoCommit) { + connection.commit(); + } return returnCodes; - } catch (Throwable t) { - throw new BatchUpdateExecution(t,i); + } catch (SQLException t) { + if (i == returnCodes.length) { + // Exception after for loop, perhaps in commit(), discard returnCodes. + throw new BatchUpdateException(t); + } else { + returnCodes[i] = Statement.EXECUTE_FAILED; + throw new BatchUpdateException(returnCodes, t); + } + } finally { + connection.setAutoCommit(autoCommit); } } @@ -1931,7 +1946,7 @@ public class PhoenixStatement implements Statement, SQLCloseable { TableName tableName = null; if (stmt instanceof ExecutableSelectStatement) { TableNode from = ((ExecutableSelectStatement)stmt).getFrom(); - if(from instanceof NamedTableNode) { + if (from instanceof NamedTableNode) { tableName = ((NamedTableNode)from).getName(); } } else if (stmt instanceof ExecutableUpsertStatement) { diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java index 56a524c6ce..616e3a029a 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java @@ -85,97 +85,4 @@ public class PhoenixPreparedStatementTest extends BaseConnectionlessQueryTest { assertEquals(SQLExceptionCode.EXECUTE_UPDATE_NOT_APPLICABLE.getErrorCode(), e.getErrorCode()); } } - - @Test - /** - * Validates that if a user sets the query timeout via the - * stmt.setQueryTimeout() JDBC method, we correctly store the timeout - * in both milliseconds and seconds. - */ - public void testSettingQueryTimeoutViaJdbc() throws Exception { - // Arrange - Connection connection = DriverManager.getConnection(getUrl()); - PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE); - PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class); - - // Act - stmt.setQueryTimeout(3); - - // Assert - assertEquals(3, stmt.getQueryTimeout()); - assertEquals(3000, phoenixStmt.getQueryTimeoutInMillis()); - } - - @Test - /** - * Validates if a user sets the timeout to zero that we store the timeout - * in millis as the Integer.MAX_VALUE. - */ - public void testSettingZeroQueryTimeoutViaJdbc() throws Exception { - // Arrange - Connection connection = DriverManager.getConnection(getUrl()); - PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE); - PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class); - - // Act - stmt.setQueryTimeout(0); - - // Assert - assertEquals(Integer.MAX_VALUE / 1000, stmt.getQueryTimeout()); - assertEquals(Integer.MAX_VALUE, phoenixStmt.getQueryTimeoutInMillis()); - } - - @Test - /** - * Validates that is negative value is supplied we set the timeout to the default. - */ - public void testSettingNegativeQueryTimeoutViaJdbc() throws Exception { - // Arrange - Connection connection = DriverManager.getConnection(getUrl()); - PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE); - PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class); - PhoenixConnection phoenixConnection = connection.unwrap(PhoenixConnection.class); - int defaultQueryTimeout = phoenixConnection.getQueryServices().getProps().getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, - QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS); - - // Act - stmt.setQueryTimeout(-1); - - // Assert - assertEquals(defaultQueryTimeout / 1000, stmt.getQueryTimeout()); - assertEquals(defaultQueryTimeout, phoenixStmt.getQueryTimeoutInMillis()); - } - - @Test - /** - * Validates that setting custom phoenix query timeout using - * the phoenix.query.timeoutMs config property is honored. - */ - public void testCustomQueryTimeout() throws Exception { - // Arrange - Properties connectionProperties = new Properties(); - connectionProperties.setProperty("phoenix.query.timeoutMs", "2350"); - Connection connection = DriverManager.getConnection(getUrl(), connectionProperties); - PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE); - PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class); - - // Assert - assertEquals(3, stmt.getQueryTimeout()); - assertEquals(2350, phoenixStmt.getQueryTimeoutInMillis()); - } - - @Test - public void testZeroCustomQueryTimeout() throws Exception { - // Arrange - Properties connectionProperties = new Properties(); - connectionProperties.setProperty("phoenix.query.timeoutMs", "0"); - Connection connection = DriverManager.getConnection(getUrl(), connectionProperties); - PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE); - PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class); - - // Assert - assertEquals(0, stmt.getQueryTimeout()); - assertEquals(0, phoenixStmt.getQueryTimeoutInMillis()); - } - } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixStatementTest.java similarity index 60% copy from phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java copy to phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixStatementTest.java index 56a524c6ce..2043cd7aab 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixStatementTest.java @@ -18,74 +18,56 @@ package org.apache.phoenix.jdbc; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.fail; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.sql.*; +import java.util.List; import java.util.Properties; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.query.BaseConnectionlessQueryTest; -import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.thirdparty.com.google.common.collect.Lists; import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; -public class PhoenixPreparedStatementTest extends BaseConnectionlessQueryTest { +public class PhoenixStatementTest extends BaseConnectionlessQueryTest { - @Test - public void testSetParameter_InvalidIndex() throws Exception { - Properties connectionProperties = new Properties(); - Connection connection = DriverManager.getConnection(getUrl(), connectionProperties); - - PreparedStatement stmt = connection.prepareStatement( - "UPSERT INTO " + ATABLE + " (organization_id, entity_id, a_integer) " + - "VALUES (?,?,?)"); - - stmt.setString(1, "AAA"); - stmt.setString(2, "BBB"); - stmt.setInt(3, 1); - - try { - stmt.setString(4, "Invalid bind column"); - fail("Setting a value for a column that doesn't exist should throw SQLException"); - } catch (SQLException e) { - // Expected exception - } - - try { - stmt.setString(-1, "Invalid bind column"); - fail("Setting a value for a column that doesn't exist should throw SQLException"); - } catch (SQLException e) { - // Expected exception - } - } - @Test public void testMutationUsingExecuteQueryShouldFail() throws Exception { Properties connectionProperties = new Properties(); Connection connection = DriverManager.getConnection(getUrl(), connectionProperties); - PreparedStatement stmt = connection.prepareStatement("DELETE FROM " + ATABLE); + Statement stmt = connection.createStatement(); try { - stmt.executeQuery(); + stmt.executeQuery("DELETE FROM " + ATABLE); fail(); } catch(SQLException e) { assertEquals(SQLExceptionCode.EXECUTE_QUERY_NOT_APPLICABLE.getErrorCode(), e.getErrorCode()); } } - + @Test public void testQueriesUsingExecuteUpdateShouldFail() throws Exception { Properties connectionProperties = new Properties(); Connection connection = DriverManager.getConnection(getUrl(), connectionProperties); - PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE); + Statement stmt = connection.createStatement(); try { - stmt.executeUpdate(); + stmt.executeUpdate("SELECT * FROM " + ATABLE); fail(); } catch(SQLException e) { assertEquals(SQLExceptionCode.EXECUTE_UPDATE_NOT_APPLICABLE.getErrorCode(), e.getErrorCode()); } } - + @Test /** * Validates that if a user sets the query timeout via the @@ -95,17 +77,17 @@ public class PhoenixPreparedStatementTest extends BaseConnectionlessQueryTest { public void testSettingQueryTimeoutViaJdbc() throws Exception { // Arrange Connection connection = DriverManager.getConnection(getUrl()); - PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE); + Statement stmt = connection.createStatement(); PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class); - + // Act stmt.setQueryTimeout(3); - + // Assert assertEquals(3, stmt.getQueryTimeout()); assertEquals(3000, phoenixStmt.getQueryTimeoutInMillis()); } - + @Test /** * Validates if a user sets the timeout to zero that we store the timeout @@ -114,17 +96,17 @@ public class PhoenixPreparedStatementTest extends BaseConnectionlessQueryTest { public void testSettingZeroQueryTimeoutViaJdbc() throws Exception { // Arrange Connection connection = DriverManager.getConnection(getUrl()); - PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE); + Statement stmt = connection.createStatement(); PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class); - + // Act stmt.setQueryTimeout(0); - + // Assert assertEquals(Integer.MAX_VALUE / 1000, stmt.getQueryTimeout()); assertEquals(Integer.MAX_VALUE, phoenixStmt.getQueryTimeoutInMillis()); } - + @Test /** * Validates that is negative value is supplied we set the timeout to the default. @@ -132,20 +114,20 @@ public class PhoenixPreparedStatementTest extends BaseConnectionlessQueryTest { public void testSettingNegativeQueryTimeoutViaJdbc() throws Exception { // Arrange Connection connection = DriverManager.getConnection(getUrl()); - PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE); + Statement stmt = connection.createStatement(); PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class); PhoenixConnection phoenixConnection = connection.unwrap(PhoenixConnection.class); int defaultQueryTimeout = phoenixConnection.getQueryServices().getProps().getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS); - + // Act stmt.setQueryTimeout(-1); - + // Assert assertEquals(defaultQueryTimeout / 1000, stmt.getQueryTimeout()); assertEquals(defaultQueryTimeout, phoenixStmt.getQueryTimeoutInMillis()); } - + @Test /** * Validates that setting custom phoenix query timeout using @@ -156,26 +138,79 @@ public class PhoenixPreparedStatementTest extends BaseConnectionlessQueryTest { Properties connectionProperties = new Properties(); connectionProperties.setProperty("phoenix.query.timeoutMs", "2350"); Connection connection = DriverManager.getConnection(getUrl(), connectionProperties); - PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE); + Statement stmt = connection.createStatement(); PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class); - + // Assert assertEquals(3, stmt.getQueryTimeout()); assertEquals(2350, phoenixStmt.getQueryTimeoutInMillis()); } - + @Test public void testZeroCustomQueryTimeout() throws Exception { // Arrange Properties connectionProperties = new Properties(); connectionProperties.setProperty("phoenix.query.timeoutMs", "0"); Connection connection = DriverManager.getConnection(getUrl(), connectionProperties); - PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE); + Statement stmt = connection.createStatement(); PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class); - + // Assert assertEquals(0, stmt.getQueryTimeout()); assertEquals(0, phoenixStmt.getQueryTimeoutInMillis()); } + @Test + public void testExecuteBatchWithFailedStatement() throws Exception { + // Arrange + Properties connectionProperties = new Properties(); + connectionProperties.setProperty("phoenix.query.timeoutMs", "0"); + Connection connection = DriverManager.getConnection(getUrl(), connectionProperties); + Statement stmt = connection.createStatement(); + PhoenixConnection connSpy = spy(connection.unwrap(PhoenixConnection.class)); + Whitebox.setInternalState(stmt, "connection", connSpy); + List<PhoenixPreparedStatement> batch = Lists.newArrayList( + mock(PhoenixPreparedStatement.class), + mock(PhoenixPreparedStatement.class), + mock(PhoenixPreparedStatement.class)); + Whitebox.setInternalState(stmt, "batch", batch); + final String exMsg = "TEST"; + when(batch.get(0).getUpdateCount()).thenReturn(1); + doThrow(new SQLException(exMsg)).when(batch.get(1)).executeForBatch(); + // However, we don't expect this to be called. + when(batch.get(1).getUpdateCount()).thenReturn(1); + + // Act & Assert + BatchUpdateException ex = assertThrows(BatchUpdateException.class, () -> stmt.executeBatch()); + assertEquals(exMsg, ex.getCause().getMessage()); + int[] updateCounts = ex.getUpdateCounts(); + assertEquals(3, updateCounts.length); + assertEquals(1, updateCounts[0]); + assertEquals(Statement.EXECUTE_FAILED, updateCounts[1]); + assertEquals(-1, updateCounts[2]); + verify(connSpy, never()).commit(); // Ensure commit was never called. + } + + @Test + public void testExecuteBatchWithCommitFailure() throws Exception { + // Arrange + Properties connectionProperties = new Properties(); + connectionProperties.setProperty("phoenix.query.timeoutMs", "0"); + Connection connection = DriverManager.getConnection(getUrl(), connectionProperties); + Statement stmt = connection.createStatement(); + PhoenixConnection connSpy = spy(connection.unwrap(PhoenixConnection.class)); + Whitebox.setInternalState(stmt, "connection", connSpy); + List<PhoenixPreparedStatement> batch = Lists.newArrayList( + mock(PhoenixPreparedStatement.class)); + Whitebox.setInternalState(stmt, "batch", batch); + final String exMsg = "TEST"; + doThrow(new SQLException(exMsg)).when(connSpy).commit(); + when(connSpy.getAutoCommit()).thenReturn(true); + + // Act & Assert + BatchUpdateException ex = assertThrows(BatchUpdateException.class, () -> stmt.executeBatch()); + assertEquals(exMsg, ex.getCause().getMessage()); + assertNull(ex.getUpdateCounts()); + } + }