This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/5.1 by this push:
     new 3ee1470633 PHOENIX-6821: Optimize batching in auto-commit mode (#1587)
3ee1470633 is described below

commit 3ee1470633bdc214c19616c01441436019a79e5e
Author: Hari Krishna Dara <harid...@gmail.com>
AuthorDate: Tue Apr 4 04:43:55 2023 +0530

    PHOENIX-6821: Optimize batching in auto-commit mode (#1587)
    
    * PHOENIX-6821: Optimize batching in auto-commit mode
    
    Squash of the change from PR https://github.com/apache/phoenix/pull/1570
    
    * Fix a merge issue with cherry-pick
    
    * Additional changes needed to fix compilation errors for 5.1 branch
    
    * Fix checkstyle errors
    
    * Fix checkstyle errors
    
    * Remove extraneous whitespace
---
 .../org/apache/phoenix/end2end/UpsertValuesIT.java | 182 ++++++++++++++++++---
 .../phoenix/exception/BatchUpdateExecution.java    |  36 ----
 .../apache/phoenix/exception/SQLExceptionCode.java |  21 ++-
 .../org/apache/phoenix/execute/MutationState.java  |   3 +
 .../phoenix/jdbc/PhoenixPreparedStatement.java     |  26 +--
 .../org/apache/phoenix/jdbc/PhoenixStatement.java  |  39 +++--
 .../phoenix/jdbc/PhoenixPreparedStatementTest.java |  93 -----------
 ...tatementTest.java => PhoenixStatementTest.java} | 141 ++++++++++------
 8 files changed, 307 insertions(+), 234 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
index 0fb46b1abe..408e7ca0b0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
@@ -17,16 +17,19 @@
  */
 package org.apache.phoenix.end2end;
 
+import static org.apache.phoenix.util.PhoenixRuntime.REQUEST_METRIC_ATTRIB;
 import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
 import static org.apache.phoenix.util.TestUtil.closeStatement;
 import static org.apache.phoenix.util.TestUtil.closeStmtAndConn;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.math.BigDecimal;
 import java.math.RoundingMode;
+import java.sql.BatchUpdateException;
 import java.sql.Connection;
 import java.sql.Date;
 import java.sql.DriverManager;
@@ -36,6 +39,7 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.hadoop.hbase.client.Result;
@@ -45,6 +49,8 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.monitoring.MetricType;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.SequenceNotFoundException;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.DateUtil;
@@ -55,6 +61,7 @@ import org.apache.phoenix.util.PropertiesUtil;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.function.ThrowingRunnable;
 
 
 @Category(ParallelStatsDisabledTest.class)
@@ -454,23 +461,25 @@ public class UpsertValuesIT extends 
ParallelStatsDisabledIT {
              closeStmtAndConn(stmt, conn);
         }
     }
-        
-    
-    @Test
-    public void testBatchedUpsert() throws Exception {
+
+    private void testBatchedUpsert(boolean autocommit) throws Exception {
         String tableName = generateUniqueName();
         Properties props = new Properties();
+        props.setProperty(REQUEST_METRIC_ATTRIB, "true");
+        props.setProperty(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, "true");
         Connection conn = null;
         PreparedStatement pstmt = null;
+        Statement stmt = null;
         try {
             conn = DriverManager.getConnection(getUrl(), props);
             conn.createStatement().execute("create table " + tableName + " (k 
varchar primary key, v integer)");
         } finally {
             closeStmtAndConn(pstmt, conn);
         }
-        
+
         try {
             conn = DriverManager.getConnection(getUrl(), props);
+            conn.setAutoCommit(autocommit);
             pstmt = conn.prepareStatement("upsert into " + tableName + " 
values (?, ?)");
             pstmt.setString(1, "a");
             pstmt.setInt(2, 1);
@@ -478,12 +487,21 @@ public class UpsertValuesIT extends 
ParallelStatsDisabledIT {
             pstmt.setString(1, "b");
             pstmt.setInt(2, 2);
             pstmt.addBatch();
+            pstmt.setString(1, "c");
+            pstmt.setInt(2, 3);
+            pstmt.addBatch();
             pstmt.executeBatch();
-            conn.commit();
+            if (!autocommit) {
+                conn.commit();
+            }
+            PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+            Map<String, Map<MetricType, Long>> mutationMetrics = 
pConn.getMutationMetrics();
+            Assert.assertEquals(3, (long) 
mutationMetrics.get(tableName).get(MetricType.MUTATION_BATCH_SIZE));
+            Assert.assertEquals(autocommit, conn.getAutoCommit());
         } finally {
-             closeStmtAndConn(pstmt, conn);
+            closeStmtAndConn(pstmt, conn);
         }
-        
+
         try {
             conn = DriverManager.getConnection(getUrl(), props);
             pstmt = conn.prepareStatement("select * from " + tableName);
@@ -494,49 +512,161 @@ public class UpsertValuesIT extends 
ParallelStatsDisabledIT {
             assertTrue(rs.next());
             assertEquals("b", rs.getString(1));
             assertEquals(2, rs.getInt(2));
+            assertTrue(rs.next());
+            assertEquals("c", rs.getString(1));
+            assertEquals(3, rs.getInt(2));
             assertFalse(rs.next());
         } finally {
-             closeStmtAndConn(pstmt, conn);
+            closeStmtAndConn(pstmt, conn);
         }
-        
-        conn = DriverManager.getConnection(getUrl(), props);
-        Statement stmt = conn.createStatement();
+
         try {
-            stmt.addBatch("upsert into " + tableName + " values ('c', 3)");
-            stmt.addBatch("select count(*) from " + tableName);
-            stmt.addBatch("upsert into " + tableName + " values ('a', 4)");
+            conn = DriverManager.getConnection(getUrl(), props);
+            conn.setAutoCommit(autocommit);
+            stmt = conn.createStatement();
+            stmt.addBatch("upsert into " + tableName + " values ('d', 4)");
+            stmt.addBatch("upsert into " + tableName + " values ('a', 5)");
             ResultSet rs = stmt.executeQuery("select count(*) from " + 
tableName);
             assertTrue(rs.next());
-            assertEquals(2, rs.getInt(1));
+            assertEquals(3, rs.getInt(1));
             int[] result = stmt.executeBatch();
-            assertEquals(3,result.length);
+            assertEquals(2, result.length);
             assertEquals(result[0], 1);
-            assertEquals(result[1], -2);
-            assertEquals(result[2], 1);
+            assertEquals(result[1], 1);
             conn.commit();
         } finally {
-             closeStmtAndConn(pstmt, conn);
+            closeStmtAndConn(stmt, conn);
         }
-        
+
         try {
             conn = DriverManager.getConnection(getUrl(), props);
-            pstmt = conn.prepareStatement("select * from " + tableName);
-            ResultSet rs = pstmt.executeQuery();
+            stmt = conn.createStatement();
+            ResultSet rs = stmt.executeQuery("select * from " + tableName);
             assertTrue(rs.next());
             assertEquals("a", rs.getString(1));
-            assertEquals(4, rs.getInt(2));
+            assertEquals(5, rs.getInt(2));
             assertTrue(rs.next());
             assertEquals("b", rs.getString(1));
             assertEquals(2, rs.getInt(2));
             assertTrue(rs.next());
             assertEquals("c", rs.getString(1));
             assertEquals(3, rs.getInt(2));
+            assertTrue(rs.next());
+            assertEquals("d", rs.getString(1));
+            assertEquals(4, rs.getInt(2));
             assertFalse(rs.next());
         } finally {
-             closeStmtAndConn(pstmt, conn);
+            closeStmtAndConn(stmt, conn);
         }
+
+        try {
+            conn = DriverManager.getConnection(getUrl(), props);
+            conn.setAutoCommit(autocommit);
+            stmt = conn.createStatement();
+            stmt.addBatch("delete from " + tableName + " where v <= 4");
+            stmt.addBatch("delete from " + tableName + " where v = 5");
+            int[] result = stmt.executeBatch();
+            assertEquals(2, result.length);
+            assertEquals(result[0], 3);
+            assertEquals(result[1], 1);
+            conn.commit();
+        } finally {
+            closeStmtAndConn(stmt, conn);
+        }
+        try {
+            conn = DriverManager.getConnection(getUrl(), props);
+            pstmt = conn.prepareStatement("select count(*) from " + tableName);
+            ResultSet rs = pstmt.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(0, rs.getInt(1));
+        } finally {
+            closeStmtAndConn(stmt, conn);
+        }
+
     }
-    
+
+    @Test
+    public void testBatchedUpsert() throws Exception {
+        testBatchedUpsert(false);
+    }
+
+    @Test
+    public void testBatchedUpsertAutoCommit() throws Exception {
+        testBatchedUpsert(true);
+    }
+
+    @Test
+    public void testBatchedUpsertMultipleBatches() throws Exception {
+        String tableName = generateUniqueName();
+        Properties props = new Properties();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute("create table " + tableName + " (k 
varchar primary key, v integer)");
+            PreparedStatement pstmt = conn.prepareStatement("upsert into " + 
tableName + " values (?, ?)");
+            pstmt.setString(1, "a");
+            pstmt.setInt(2, 1);
+            pstmt.addBatch();
+            pstmt.executeBatch();
+            pstmt.setString(1, "b");
+            pstmt.setInt(2, 2);
+            pstmt.addBatch();
+            pstmt.executeBatch();
+            conn.commit();
+        }
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            Statement stmt = conn.createStatement();
+            ResultSet rs = stmt.executeQuery("select count(*) from " + 
tableName);
+            assertTrue(rs.next());
+            assertEquals(2, rs.getInt(1));
+        }
+    }
+
+    private void testBatchRollback(boolean autocommit) throws Exception {
+        String tableName = generateUniqueName();
+        Properties props = new Properties();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute("create table " + tableName + " (k 
varchar primary key, v integer)");
+            conn.setAutoCommit(autocommit);
+            PreparedStatement pstmt = conn.prepareStatement("upsert into " + 
tableName + " values (?, ?)");
+            pstmt.setString(1, "a");
+            pstmt.setInt(2, 1);
+            pstmt.addBatch();
+            pstmt.executeBatch();
+            conn.rollback();
+        }
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            Statement stmt = conn.createStatement();
+            ResultSet rs = stmt.executeQuery("select count(*) from " + 
tableName);
+            assertTrue(rs.next());
+            assertEquals(autocommit ? 1 : 0, rs.getInt(1));
+        }
+    }
+
+    @Test
+    public void testBatchRollback() throws Exception {
+        testBatchRollback(false);
+    }
+
+    @Test
+    public void testBatchNoRollbackWithAutoCommit() throws Exception {
+        testBatchRollback(true);
+    }
+
+    @Test
+    public void testDQLFailsInBatch() throws Exception {
+        String tableName = generateUniqueName();
+        Properties props = new Properties();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute("create table " + tableName + " (k 
varchar primary key, v integer)");
+            Statement stmt = conn.createStatement();
+            stmt.addBatch("select * from " + tableName);
+            BatchUpdateException ex = assertThrows(BatchUpdateException.class, 
() -> stmt.executeBatch());
+            assertEquals("java.sql.BatchUpdateException: ERROR 1151 (XCL51): A 
batch operation can't include a statement that produces result sets.",
+                    ex.getMessage());
+        }
+    }
+
     private static Date toDate(String dateString) {
         return DateUtil.parseDate(dateString);
     }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/BatchUpdateExecution.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/BatchUpdateExecution.java
deleted file mode 100644
index d3cd82b54a..0000000000
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/BatchUpdateExecution.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.exception;
-
-import java.sql.SQLException;
-
-public class BatchUpdateExecution extends SQLException {
-    private static final long serialVersionUID = 1L;
-    private static SQLExceptionCode code = SQLExceptionCode.BATCH_EXCEPTION;
-    private final int batchIndex;
-
-    public BatchUpdateExecution(Throwable cause, int batchIndex) {
-        super(new SQLExceptionInfo.Builder(code).build().toString(),
-                code.getSQLState(), code.getErrorCode(), cause);
-        this.batchIndex = batchIndex;
-    }
-
-    public int getBatchIndex() {
-        return batchIndex;
-    }
-}
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 3907ebd555..26b17e6964 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.exception;
 
+import java.sql.BatchUpdateException;
 import java.sql.SQLException;
 import java.sql.SQLTimeoutException;
 import java.util.Map;
@@ -444,6 +445,8 @@ public enum SQLExceptionCode {
             "Duplicate ENCODED_QUALIFIER."),
     MISSING_CQ(1150, "XCL49",
             "Missing ENCODED_QUALIFIER."),
+    EXECUTE_BATCH_FOR_STMT_WITH_RESULT_SET(1151, "XCL51", "A batch operation 
can't include a "
+            + "statement that produces result sets.", 
Factory.BATCH_UPDATE_ERROR),
 
 
     /**
@@ -611,15 +614,16 @@ public enum SQLExceptionCode {
     }
 
     public static interface Factory {
-        public static final Factory DEFAULT = new Factory() {
+        Factory DEFAULT = new Factory() {
 
             @Override
             public SQLException newException(SQLExceptionInfo info) {
-                return new SQLException(info.toString(), 
info.getCode().getSQLState(), info.getCode().getErrorCode(), 
info.getRootCause());
+                return new SQLException(info.toString(), 
info.getCode().getSQLState(),
+                        info.getCode().getErrorCode(), info.getRootCause());
             }
             
         };
-        public static final Factory SYNTAX_ERROR = new Factory() {
+        Factory SYNTAX_ERROR = new Factory() {
 
             @Override
             public SQLException newException(SQLExceptionInfo info) {
@@ -627,7 +631,16 @@ public enum SQLExceptionCode {
             }
             
         };
-        public SQLException newException(SQLExceptionInfo info);
+        Factory BATCH_UPDATE_ERROR = new Factory() {
+
+            @Override
+            public SQLException newException(SQLExceptionInfo info) {
+                return new BatchUpdateException(info.toString(), 
info.getCode().getSQLState(),
+                        info.getCode().getErrorCode(), (int[]) null, 
info.getRootCause());
+            }
+
+        };
+        SQLException newException(SQLExceptionInfo info);
     }
     
     private static final Map<Integer,SQLExceptionCode> errorCodeMap = 
Maps.newHashMapWithExpectedSize(SQLExceptionCode.values().length);
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index e32f5a6e76..04cb379c42 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -1946,4 +1946,7 @@ public class MutationState implements SQLCloseable {
         return mutationMetricQueue;
     }
 
+    public boolean isEmpty() {
+        return mutationsMap != null ? mutationsMap.isEmpty() : true;
+    }
 }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
index d955777cd8..be0324e0bb 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
@@ -160,11 +160,23 @@ public class PhoenixPreparedStatement extends 
PhoenixStatement implements Prepar
         return compileMutation(statement, query);
     }
 
-    boolean execute(boolean batched) throws SQLException {
+    void executeForBatch() throws SQLException {
         throwIfUnboundParameters();
-        if (!batched && statement.getOperation().isMutation() && 
!batch.isEmpty()) {
-            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH)
-            .build().buildException();
+        if (!statement.getOperation().isMutation()) {
+            throw new SQLExceptionInfo.Builder(
+                    SQLExceptionCode.EXECUTE_BATCH_FOR_STMT_WITH_RESULT_SET)
+                    .build().buildException();
+        }
+        executeMutation(statement, createAuditQueryLogger(statement, query));
+    }
+
+    @Override
+    public boolean execute() throws SQLException {
+        throwIfUnboundParameters();
+        if (statement.getOperation().isMutation() && !batch.isEmpty()) {
+            throw new SQLExceptionInfo.Builder(
+                    SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH)
+                    .build().buildException();
         }
         if (statement.getOperation().isMutation()) {
             executeMutation(statement, 
createAuditQueryLogger(statement,query));
@@ -172,12 +184,6 @@ public class PhoenixPreparedStatement extends 
PhoenixStatement implements Prepar
         }
         executeQuery(statement, createQueryLogger(statement,query));
         return true;
-        
-    }
-
-    @Override
-    public boolean execute() throws SQLException {
-        return execute(false);
     }
 
     @Override
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 5562c8d5c5..64e4166171 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -24,6 +24,7 @@ import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SELECT_SQ
 import java.io.File;
 import java.io.IOException;
 import java.io.Reader;
+import java.sql.BatchUpdateException;
 import java.sql.ParameterMetaData;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -78,7 +79,6 @@ import org.apache.phoenix.compile.StatementPlan;
 import org.apache.phoenix.compile.TraceQueryPlan;
 import org.apache.phoenix.compile.UpsertCompiler;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
-import org.apache.phoenix.exception.BatchUpdateExecution;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.exception.UpgradeRequiredException;
@@ -456,7 +456,7 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
                                 setLastUpdateCount(lastUpdateCount);
                                 setLastUpdateOperation(stmt.getOperation());
                                 
connection.incrementStatementExecutionCounter();
-                                if(queryLogger.isAuditLoggingEnabled()) {
+                                if (queryLogger.isAuditLoggingEnabled()) {
                                     queryLogger.log(QueryLogInfo.TABLE_NAME_I, 
getTargetForAudit(stmt));
                                     
queryLogger.log(QueryLogInfo.QUERY_STATUS_I, QueryStatus.COMPLETED.toString());
                                     
queryLogger.log(QueryLogInfo.NO_OF_RESULTS_ITERATED_I, lastUpdateCount);
@@ -489,7 +489,7 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
                     }, PhoenixContextExecutor.inContext(),
                         Tracing.withTracing(connection, this.toString()));
         } catch (Exception e) {
-            if(queryLogger.isAuditLoggingEnabled()) {
+            if (queryLogger.isAuditLoggingEnabled()) {
                 queryLogger.log(QueryLogInfo.TABLE_NAME_I, 
getTargetForAudit(stmt));
                 queryLogger.log(QueryLogInfo.EXCEPTION_TRACE_I, 
Throwables.getStackTraceAsString(e));
                 queryLogger.log(QueryLogInfo.QUERY_STATUS_I, 
QueryStatus.FAILED.toString());
@@ -861,7 +861,7 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
         @SuppressWarnings("unchecked")
         @Override
         public MutationPlan compilePlan(PhoenixStatement stmt, 
Sequence.ValueOp seqAction) throws SQLException {
-            if(!getUdfParseNodes().isEmpty()) {
+            if (!getUdfParseNodes().isEmpty()) {
                 stmt.throwIfUnallowedUserDefinedFunctions(getUdfParseNodes());
             }
                    DeleteCompiler compiler = new DeleteCompiler(stmt, 
this.getOperation());
@@ -1829,26 +1829,41 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
 
     /**
      * Execute the current batch of statements. If any exception occurs
-     * during execution, a org.apache.phoenix.exception.BatchUpdateException
-     * is thrown which includes the index of the statement within the
-     * batch when the exception occurred.
+     * during execution, a {@link java.sql.BatchUpdateException}
+     * is thrown which compposes the update counts for statements executed so
+     * far.
      */
     @Override
     public int[] executeBatch() throws SQLException {
         int i = 0;
+        int[] returnCodes = new int [batch.size()];
+        Arrays.fill(returnCodes, -1);
+        boolean autoCommit = connection.getAutoCommit();
+        connection.setAutoCommit(false);
         try {
-            int[] returnCodes = new int [batch.size()];
             for (i = 0; i < returnCodes.length; i++) {
                 PhoenixPreparedStatement statement = batch.get(i);
-                returnCodes[i] = statement.execute(true) ? 
Statement.SUCCESS_NO_INFO : statement.getUpdateCount();
+                statement.executeForBatch();
+                returnCodes[i] = statement.getUpdateCount();
             }
             // Flush all changes in batch if auto flush is true
             flushIfNecessary();
             // If we make it all the way through, clear the batch
             clearBatch();
+            if (autoCommit) {
+                connection.commit();
+            }
             return returnCodes;
-        } catch (Throwable t) {
-            throw new BatchUpdateExecution(t,i);
+        } catch (SQLException t) {
+            if (i == returnCodes.length) {
+                // Exception after for loop, perhaps in commit(), discard 
returnCodes.
+                throw new BatchUpdateException(t);
+            } else {
+                returnCodes[i] = Statement.EXECUTE_FAILED;
+                throw new BatchUpdateException(returnCodes, t);
+            }
+        } finally {
+            connection.setAutoCommit(autoCommit);
         }
     }
 
@@ -1931,7 +1946,7 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
         TableName tableName = null;
         if (stmt instanceof ExecutableSelectStatement) {
             TableNode from = ((ExecutableSelectStatement)stmt).getFrom();
-            if(from instanceof NamedTableNode) {
+            if (from instanceof NamedTableNode) {
                 tableName = ((NamedTableNode)from).getName();
             }
         } else if (stmt instanceof ExecutableUpsertStatement) {
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java
index 56a524c6ce..616e3a029a 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java
@@ -85,97 +85,4 @@ public class PhoenixPreparedStatementTest extends 
BaseConnectionlessQueryTest {
             
assertEquals(SQLExceptionCode.EXECUTE_UPDATE_NOT_APPLICABLE.getErrorCode(), 
e.getErrorCode());
         }
     }
-    
-    @Test
-    /**
-     * Validates that if a user sets the query timeout via the
-     * stmt.setQueryTimeout() JDBC method, we correctly store the timeout
-     * in both milliseconds and seconds.
-     */
-    public void testSettingQueryTimeoutViaJdbc() throws Exception {
-        // Arrange
-        Connection connection = DriverManager.getConnection(getUrl());
-        PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " 
+ ATABLE);
-        PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
-        
-        // Act
-        stmt.setQueryTimeout(3);
-    
-        // Assert
-        assertEquals(3, stmt.getQueryTimeout());
-        assertEquals(3000, phoenixStmt.getQueryTimeoutInMillis());
-    }
-    
-    @Test
-    /**
-     * Validates if a user sets the timeout to zero that we store the timeout
-     * in millis as the Integer.MAX_VALUE. 
-     */
-    public void testSettingZeroQueryTimeoutViaJdbc() throws Exception {
-        // Arrange
-        Connection connection = DriverManager.getConnection(getUrl());
-        PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " 
+ ATABLE);
-        PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
-        
-        // Act
-        stmt.setQueryTimeout(0);
-    
-        // Assert
-        assertEquals(Integer.MAX_VALUE / 1000, stmt.getQueryTimeout());
-        assertEquals(Integer.MAX_VALUE, phoenixStmt.getQueryTimeoutInMillis());
-    }
-    
-    @Test
-    /**
-     * Validates that is negative value is supplied we set the timeout to the 
default.
-     */
-    public void testSettingNegativeQueryTimeoutViaJdbc() throws Exception {
-        // Arrange
-        Connection connection = DriverManager.getConnection(getUrl());
-        PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " 
+ ATABLE);
-        PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
-        PhoenixConnection phoenixConnection = 
connection.unwrap(PhoenixConnection.class);
-        int defaultQueryTimeout = 
phoenixConnection.getQueryServices().getProps().getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB,
 
-            QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
-        
-        // Act
-        stmt.setQueryTimeout(-1);
-    
-        // Assert
-        assertEquals(defaultQueryTimeout / 1000, stmt.getQueryTimeout());
-        assertEquals(defaultQueryTimeout, 
phoenixStmt.getQueryTimeoutInMillis());
-    }
-    
-    @Test
-    /**
-     * Validates that setting custom phoenix query timeout using
-     * the phoenix.query.timeoutMs config property is honored.
-     */
-    public void testCustomQueryTimeout() throws Exception {
-        // Arrange
-        Properties connectionProperties = new Properties();
-        connectionProperties.setProperty("phoenix.query.timeoutMs", "2350");
-        Connection connection = DriverManager.getConnection(getUrl(), 
connectionProperties);
-        PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " 
+ ATABLE);
-        PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
-    
-        // Assert
-        assertEquals(3, stmt.getQueryTimeout());
-        assertEquals(2350, phoenixStmt.getQueryTimeoutInMillis());
-    }
-    
-    @Test
-    public void testZeroCustomQueryTimeout() throws Exception {
-        // Arrange
-        Properties connectionProperties = new Properties();
-        connectionProperties.setProperty("phoenix.query.timeoutMs", "0");
-        Connection connection = DriverManager.getConnection(getUrl(), 
connectionProperties);
-        PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " 
+ ATABLE);
-        PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
-    
-        // Assert
-        assertEquals(0, stmt.getQueryTimeout());
-        assertEquals(0, phoenixStmt.getQueryTimeoutInMillis());
-    }
-
 }
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java
 b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixStatementTest.java
similarity index 60%
copy from 
phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java
copy to 
phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixStatementTest.java
index 56a524c6ce..2043cd7aab 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixStatementTest.java
@@ -18,74 +18,56 @@
 package org.apache.phoenix.jdbc;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.sql.*;
+import java.util.List;
 import java.util.Properties;
 
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.query.BaseConnectionlessQueryTest;
-import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
 
-public class PhoenixPreparedStatementTest extends BaseConnectionlessQueryTest {
+public class PhoenixStatementTest extends BaseConnectionlessQueryTest {
 
-    @Test
-    public void testSetParameter_InvalidIndex() throws Exception {
-        Properties connectionProperties = new Properties();
-        Connection connection = DriverManager.getConnection(getUrl(), 
connectionProperties);
-
-        PreparedStatement stmt = connection.prepareStatement(
-                "UPSERT INTO " + ATABLE + " (organization_id, entity_id, 
a_integer) " +
-                        "VALUES (?,?,?)");
-
-        stmt.setString(1, "AAA");
-        stmt.setString(2, "BBB");
-        stmt.setInt(3, 1);
-
-        try {
-            stmt.setString(4, "Invalid bind column");
-            fail("Setting a value for a column that doesn't exist should throw 
SQLException");
-        } catch (SQLException e) {
-            // Expected exception
-        }
-
-        try {
-            stmt.setString(-1, "Invalid bind column");
-            fail("Setting a value for a column that doesn't exist should throw 
SQLException");
-        } catch (SQLException e) {
-            // Expected exception
-        }
-    }
-    
     @Test
     public void testMutationUsingExecuteQueryShouldFail() throws Exception {
         Properties connectionProperties = new Properties();
         Connection connection = DriverManager.getConnection(getUrl(), 
connectionProperties);
-        PreparedStatement stmt = connection.prepareStatement("DELETE FROM " + 
ATABLE);
+        Statement stmt = connection.createStatement();
         try {
-            stmt.executeQuery();
+            stmt.executeQuery("DELETE FROM " + ATABLE);
             fail();
         } catch(SQLException e) {
             
assertEquals(SQLExceptionCode.EXECUTE_QUERY_NOT_APPLICABLE.getErrorCode(), 
e.getErrorCode());
         }
     }
-    
+
     @Test
     public void testQueriesUsingExecuteUpdateShouldFail() throws Exception {
         Properties connectionProperties = new Properties();
         Connection connection = DriverManager.getConnection(getUrl(), 
connectionProperties);
-        PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " 
+ ATABLE);
+        Statement stmt = connection.createStatement();
         try {
-            stmt.executeUpdate();
+            stmt.executeUpdate("SELECT * FROM " + ATABLE);
             fail();
         } catch(SQLException e) {
             
assertEquals(SQLExceptionCode.EXECUTE_UPDATE_NOT_APPLICABLE.getErrorCode(), 
e.getErrorCode());
         }
     }
-    
+
     @Test
     /**
      * Validates that if a user sets the query timeout via the
@@ -95,17 +77,17 @@ public class PhoenixPreparedStatementTest extends 
BaseConnectionlessQueryTest {
     public void testSettingQueryTimeoutViaJdbc() throws Exception {
         // Arrange
         Connection connection = DriverManager.getConnection(getUrl());
-        PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " 
+ ATABLE);
+        Statement stmt = connection.createStatement();
         PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
-        
+
         // Act
         stmt.setQueryTimeout(3);
-    
+
         // Assert
         assertEquals(3, stmt.getQueryTimeout());
         assertEquals(3000, phoenixStmt.getQueryTimeoutInMillis());
     }
-    
+
     @Test
     /**
      * Validates if a user sets the timeout to zero that we store the timeout
@@ -114,17 +96,17 @@ public class PhoenixPreparedStatementTest extends 
BaseConnectionlessQueryTest {
     public void testSettingZeroQueryTimeoutViaJdbc() throws Exception {
         // Arrange
         Connection connection = DriverManager.getConnection(getUrl());
-        PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " 
+ ATABLE);
+        Statement stmt = connection.createStatement();
         PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
-        
+
         // Act
         stmt.setQueryTimeout(0);
-    
+
         // Assert
         assertEquals(Integer.MAX_VALUE / 1000, stmt.getQueryTimeout());
         assertEquals(Integer.MAX_VALUE, phoenixStmt.getQueryTimeoutInMillis());
     }
-    
+
     @Test
     /**
      * Validates that is negative value is supplied we set the timeout to the 
default.
@@ -132,20 +114,20 @@ public class PhoenixPreparedStatementTest extends 
BaseConnectionlessQueryTest {
     public void testSettingNegativeQueryTimeoutViaJdbc() throws Exception {
         // Arrange
         Connection connection = DriverManager.getConnection(getUrl());
-        PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " 
+ ATABLE);
+        Statement stmt = connection.createStatement();
         PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
         PhoenixConnection phoenixConnection = 
connection.unwrap(PhoenixConnection.class);
         int defaultQueryTimeout = 
phoenixConnection.getQueryServices().getProps().getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB,
 
             QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
-        
+
         // Act
         stmt.setQueryTimeout(-1);
-    
+
         // Assert
         assertEquals(defaultQueryTimeout / 1000, stmt.getQueryTimeout());
         assertEquals(defaultQueryTimeout, 
phoenixStmt.getQueryTimeoutInMillis());
     }
-    
+
     @Test
     /**
      * Validates that setting custom phoenix query timeout using
@@ -156,26 +138,79 @@ public class PhoenixPreparedStatementTest extends 
BaseConnectionlessQueryTest {
         Properties connectionProperties = new Properties();
         connectionProperties.setProperty("phoenix.query.timeoutMs", "2350");
         Connection connection = DriverManager.getConnection(getUrl(), 
connectionProperties);
-        PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " 
+ ATABLE);
+        Statement stmt = connection.createStatement();
         PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
-    
+
         // Assert
         assertEquals(3, stmt.getQueryTimeout());
         assertEquals(2350, phoenixStmt.getQueryTimeoutInMillis());
     }
-    
+
     @Test
     public void testZeroCustomQueryTimeout() throws Exception {
         // Arrange
         Properties connectionProperties = new Properties();
         connectionProperties.setProperty("phoenix.query.timeoutMs", "0");
         Connection connection = DriverManager.getConnection(getUrl(), 
connectionProperties);
-        PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " 
+ ATABLE);
+        Statement stmt = connection.createStatement();
         PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
-    
+
         // Assert
         assertEquals(0, stmt.getQueryTimeout());
         assertEquals(0, phoenixStmt.getQueryTimeoutInMillis());
     }
 
+    @Test
+    public void testExecuteBatchWithFailedStatement() throws Exception {
+        // Arrange
+        Properties connectionProperties = new Properties();
+        connectionProperties.setProperty("phoenix.query.timeoutMs", "0");
+        Connection connection = DriverManager.getConnection(getUrl(), 
connectionProperties);
+        Statement stmt = connection.createStatement();
+        PhoenixConnection connSpy = 
spy(connection.unwrap(PhoenixConnection.class));
+        Whitebox.setInternalState(stmt, "connection", connSpy);
+        List<PhoenixPreparedStatement> batch = Lists.newArrayList(
+                mock(PhoenixPreparedStatement.class),
+                mock(PhoenixPreparedStatement.class),
+                mock(PhoenixPreparedStatement.class));
+        Whitebox.setInternalState(stmt, "batch", batch);
+        final String exMsg = "TEST";
+        when(batch.get(0).getUpdateCount()).thenReturn(1);
+        doThrow(new SQLException(exMsg)).when(batch.get(1)).executeForBatch();
+        // However, we don't expect this to be called.
+        when(batch.get(1).getUpdateCount()).thenReturn(1);
+
+        // Act & Assert
+        BatchUpdateException ex = assertThrows(BatchUpdateException.class, () 
-> stmt.executeBatch());
+        assertEquals(exMsg, ex.getCause().getMessage());
+        int[] updateCounts = ex.getUpdateCounts();
+        assertEquals(3, updateCounts.length);
+        assertEquals(1, updateCounts[0]);
+        assertEquals(Statement.EXECUTE_FAILED, updateCounts[1]);
+        assertEquals(-1, updateCounts[2]);
+        verify(connSpy, never()).commit(); // Ensure commit was never called.
+    }
+
+    @Test
+    public void testExecuteBatchWithCommitFailure() throws Exception {
+        // Arrange
+        Properties connectionProperties = new Properties();
+        connectionProperties.setProperty("phoenix.query.timeoutMs", "0");
+        Connection connection = DriverManager.getConnection(getUrl(), 
connectionProperties);
+        Statement stmt = connection.createStatement();
+        PhoenixConnection connSpy = 
spy(connection.unwrap(PhoenixConnection.class));
+        Whitebox.setInternalState(stmt, "connection", connSpy);
+        List<PhoenixPreparedStatement> batch = Lists.newArrayList(
+                mock(PhoenixPreparedStatement.class));
+        Whitebox.setInternalState(stmt, "batch", batch);
+        final String exMsg = "TEST";
+        doThrow(new SQLException(exMsg)).when(connSpy).commit();
+        when(connSpy.getAutoCommit()).thenReturn(true);
+
+        // Act & Assert
+        BatchUpdateException ex = assertThrows(BatchUpdateException.class, () 
-> stmt.executeBatch());
+        assertEquals(exMsg, ex.getCause().getMessage());
+        assertNull(ex.getUpdateCounts());
+    }
+
 }


Reply via email to