Repository: phoenix
Updated Branches:
  refs/heads/txn d3b85048a -> 746c6d8ea


Enhance PartialCommitIT.java


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b198d6b3
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b198d6b3
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b198d6b3

Branch: refs/heads/txn
Commit: b198d6b39fbee1065ecc682220877bf29c2c6242
Parents: d3b8504
Author: Thomas D'Silva <[email protected]>
Authored: Mon Nov 16 12:57:39 2015 -0800
Committer: Thomas D'Silva <[email protected]>
Committed: Tue Nov 17 13:52:51 2015 -0800

----------------------------------------------------------------------
 .../apache/phoenix/execute/PartialCommitIT.java | 149 +++++++++++--------
 .../apache/phoenix/execute/MutationState.java   |  12 +-
 2 files changed, 92 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/b198d6b3/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
index 11a6e67..a87761e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
@@ -22,8 +22,6 @@ package org.apache.phoenix.execute;
 import static com.google.common.collect.Lists.newArrayList;
 import static com.google.common.collect.Sets.newHashSet;
 import static java.util.Collections.singletonList;
-import static org.apache.phoenix.query.BaseTest.initAndRegisterDriver;
-import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
@@ -39,6 +37,8 @@ import java.sql.Driver;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -57,38 +58,37 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.hbase.index.Indexer;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 
-@Category(NeedsOwnMiniClusterTest.class)
-public class PartialCommitIT {
+@RunWith(Parameterized.class)
+public class PartialCommitIT extends BaseTest {
     
-    private static final String TABLE_NAME_TO_FAIL = 
"b_failure_table".toUpperCase();
-    private static final byte[] ROW_TO_FAIL = Bytes.toBytes("fail me");
-    private static final String UPSERT_TO_FAIL = "upsert into " + 
TABLE_NAME_TO_FAIL + " values ('" + Bytes.toString(ROW_TO_FAIL) + "', 'boom!')";
-    private static final String UPSERT_SELECT_TO_FAIL = "upsert into " + 
TABLE_NAME_TO_FAIL + " select k, c from a_success_table";
-    private static final String DELETE_TO_FAIL = "delete from " + 
TABLE_NAME_TO_FAIL + "  where k='z'";
+       private final String A_SUCESS_TABLE;
+       private final String B_FAILURE_TABLE;
+       private final String C_SUCESS_TABLE;
+    private final String UPSERT_TO_FAIL;
+    private final String UPSERT_SELECT_TO_FAIL;
+    private final String DELETE_TO_FAIL;
+    private static final String TABLE_NAME_TO_FAIL = "B_FAILURE_TABLE";
+    private static final byte[] ROW_TO_FAIL_UPSERT_BYTES = Bytes.toBytes("fail 
me upsert");
+    private static final byte[] ROW_TO_FAIL_DELETE_BYTES = Bytes.toBytes("fail 
me delete");
     private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
-    private static String url;
     private static Driver driver;
-    private static final Properties props = new Properties();
-    
-    static {
-        props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 10);
-    }
     
     @BeforeClass
     public static void setupCluster() throws Exception {
@@ -106,27 +106,49 @@ public class PartialCommitIT {
       // Must update config before starting server
       props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
       driver = initAndRegisterDriver(url, new 
ReadOnlyProps(props.entrySet().iterator()));
+      clusterInitialized = true;
+      setupTxManager();
       createTablesWithABitOfData();
     }
     
+    @Parameters(name="transactional = {0}")
+    public static Collection<Boolean> data() {
+        return Arrays.asList(false, true);
+    }
+    
+    public PartialCommitIT(boolean transactional) {
+               if (transactional) {
+                       A_SUCESS_TABLE = "A_SUCCESS_TABLE_TXN";
+                       B_FAILURE_TABLE = TABLE_NAME_TO_FAIL+"_TXN";
+                       C_SUCESS_TABLE = "C_SUCCESS_TABLE_TXN";
+               }
+               else {
+                       A_SUCESS_TABLE = "A_SUCCESS_TABLE";
+                       B_FAILURE_TABLE = TABLE_NAME_TO_FAIL;
+                       C_SUCESS_TABLE = "C_SUCCESS_TABLE";
+               }
+           UPSERT_TO_FAIL = "upsert into " + B_FAILURE_TABLE + " values ('" + 
Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "', 'boom!')";
+           UPSERT_SELECT_TO_FAIL = "upsert into " + B_FAILURE_TABLE + " select 
k, c from a_success_table";
+           DELETE_TO_FAIL = "delete from " + B_FAILURE_TABLE + "  where k='" + 
Bytes.toString(ROW_TO_FAIL_DELETE_BYTES) + "'";
+       }
+    
     private static void createTablesWithABitOfData() throws Exception {
-        Properties props = new Properties();
-        props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 10);
-
         try (Connection con = driver.connect(url, new Properties())) {
             Statement sta = con.createStatement();
             sta.execute("create table a_success_table (k varchar primary key, 
c varchar)");
             sta.execute("create table b_failure_table (k varchar primary key, 
c varchar)");
             sta.execute("create table c_success_table (k varchar primary key, 
c varchar)");
+            sta.execute("create table a_success_table_txn (k varchar primary 
key, c varchar) TRANSACTIONAL=true");
+            sta.execute("create table b_failure_table_txn (k varchar primary 
key, c varchar) TRANSACTIONAL=true");
+            sta.execute("create table c_success_table_txn (k varchar primary 
key, c varchar) TRANSACTIONAL=true");
             con.commit();
         }
 
-        props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 100);
-
         try (Connection con = driver.connect(url, new Properties())) {
             con.setAutoCommit(false);
             Statement sta = con.createStatement();
-            for (String table : newHashSet("a_success_table", 
TABLE_NAME_TO_FAIL, "c_success_table")) {
+            for (String table : newHashSet("a_success_table", 
"b_failure_table", "c_success_table", 
+                       "a_success_table_txn", "b_failure_table_txn", 
"c_success_table_txn")) {
                 sta.execute("upsert into " + table + " values ('z', 'z')");
                 sta.execute("upsert into " + table + " values ('zz', 'zz')");
                 sta.execute("upsert into " + table + " values ('zzz', 'zzz')");
@@ -142,46 +164,44 @@ public class PartialCommitIT {
     
     @Test
     public void testNoFailure() {
-        testPartialCommit(singletonList("upsert into a_success_table values 
('testNoFailure', 'a')"), 0, new int[0], false,
-                                        singletonList("select count(*) from 
a_success_table where k='testNoFailure'"), singletonList(new Integer(1)));
+        testPartialCommit(singletonList("upsert into " + A_SUCESS_TABLE + " 
values ('testNoFailure', 'a')"), 0, new int[0], false,
+                                        singletonList("select count(*) from " 
+ A_SUCESS_TABLE + " where k='testNoFailure'"), singletonList(new Integer(1)));
     }
     
     @Test
     public void testUpsertFailure() {
-        testPartialCommit(newArrayList("upsert into a_success_table values 
('testUpsertFailure1', 'a')", 
+        testPartialCommit(newArrayList("upsert into " + A_SUCESS_TABLE + " 
values ('testUpsertFailure1', 'a')", 
                                        UPSERT_TO_FAIL, 
-                                       "upsert into a_success_table values 
('testUpsertFailure2', 'b')"), 
+                                       "upsert into " + A_SUCESS_TABLE + " 
values ('testUpsertFailure2', 'b')"), 
                                        1, new int[]{1}, true,
-                                       newArrayList("select count(*) from 
a_success_table where k like 'testUpsertFailure_'",
-                                                    "select count(*) from " + 
TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"), 
+                                       newArrayList("select count(*) from " + 
A_SUCESS_TABLE + " where k like 'testUpsertFailure_'",
+                                                    "select count(*) from " + 
B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + 
"'"), 
                                        newArrayList(new Integer(2), new 
Integer(0)));
     }
     
     @Test
     public void testUpsertSelectFailure() throws SQLException {
-        props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 100);
-
         try (Connection con = driver.connect(url, new Properties())) {
-            con.createStatement().execute("upsert into a_success_table values 
('" + Bytes.toString(ROW_TO_FAIL) + "', 'boom!')");
+            con.createStatement().execute("upsert into " + A_SUCESS_TABLE + " 
values ('" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "', 'boom!')");
             con.commit();
         }
         
-        testPartialCommit(newArrayList("upsert into a_success_table values 
('testUpsertSelectFailure', 'a')", 
+        testPartialCommit(newArrayList("upsert into " + A_SUCESS_TABLE + " 
values ('testUpsertSelectFailure', 'a')", 
                                        UPSERT_SELECT_TO_FAIL), 
                                        1, new int[]{1}, true,
-                                       newArrayList("select count(*) from 
a_success_table where k in ('testUpsertSelectFailure', '" + 
Bytes.toString(ROW_TO_FAIL) + "')",
-                                                    "select count(*) from " + 
TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"), 
+                                       newArrayList("select count(*) from " + 
A_SUCESS_TABLE + " where k in ('testUpsertSelectFailure', '" + 
Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "')",
+                                                    "select count(*) from " + 
B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + 
"'"), 
                                        newArrayList(new Integer(2), new 
Integer(0)));
     }
     
     @Test
     public void testDeleteFailure() {
-        testPartialCommit(newArrayList("upsert into a_success_table values 
('testDeleteFailure1', 'a')", 
+        testPartialCommit(newArrayList("upsert into " + A_SUCESS_TABLE + " 
values ('testDeleteFailure1', 'a')", 
                                        DELETE_TO_FAIL,
-                                       "upsert into a_success_table values 
('testDeleteFailure2', 'b')"), 
+                                       "upsert into " + A_SUCESS_TABLE + " 
values ('testDeleteFailure2', 'b')"), 
                                        1, new int[]{1}, true,
-                                       newArrayList("select count(*) from 
a_success_table where k like 'testDeleteFailure_'",
-                                                    "select count(*) from " + 
TABLE_NAME_TO_FAIL + " where k = 'z'"), 
+                                       newArrayList("select count(*) from " + 
A_SUCESS_TABLE + " where k like 'testDeleteFailure_'",
+                                                    "select count(*) from " + 
B_FAILURE_TABLE + " where k = 'z'"), 
                                        newArrayList(new Integer(2), new 
Integer(1)));
     }
     
@@ -190,27 +210,27 @@ public class PartialCommitIT {
      */
     @Test
     public void testOrderOfMutationsIsPredicatable() {
-        testPartialCommit(newArrayList("upsert into c_success_table values 
('testOrderOfMutationsIsPredicatable', 'c')", // will fail because 
c_success_table is after b_failure_table by table sort order
+        testPartialCommit(newArrayList("upsert into " + C_SUCESS_TABLE + " 
values ('testOrderOfMutationsIsPredicatable', 'c')", // will fail because 
c_success_table is after b_failure_table by table sort order
                                        UPSERT_TO_FAIL, 
-                                       "upsert into a_success_table values 
('testOrderOfMutationsIsPredicatable', 'a')"), // will succeed because 
a_success_table is before b_failure_table by table sort order
+                                       "upsert into " + A_SUCESS_TABLE + " 
values ('testOrderOfMutationsIsPredicatable', 'a')"), // will succeed because 
a_success_table is before b_failure_table by table sort order
                                        2, new int[]{0,1}, true,
-                                       newArrayList("select count(*) from 
c_success_table where k='testOrderOfMutationsIsPredicatable'",
-                                                    "select count(*) from 
a_success_table where k='testOrderOfMutationsIsPredicatable'",
-                                                    "select count(*) from " + 
TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"), 
+                                       newArrayList("select count(*) from " + 
C_SUCESS_TABLE + " where k='testOrderOfMutationsIsPredicatable'",
+                                                    "select count(*) from " + 
A_SUCESS_TABLE + " where k='testOrderOfMutationsIsPredicatable'",
+                                                    "select count(*) from " + 
B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + 
"'"), 
                                        newArrayList(new Integer(0), new 
Integer(1), new Integer(0)));
     }
     
     @Test
-    public void checkThatAllStatementTypesMaintainOrderInConnection() {
-        testPartialCommit(newArrayList("upsert into a_success_table values 
('k', 'checkThatAllStatementTypesMaintainOrderInConnection')", 
-                                       "upsert into a_success_table select k, 
c from c_success_table",
+    public void testStatementOrderMaintainedInConnection() {
+        testPartialCommit(newArrayList("upsert into " + A_SUCESS_TABLE + " 
values ('testStatementOrderMaintainedInConnection', 'a')", 
+                                       "upsert into " + A_SUCESS_TABLE + " 
select k, c from " + C_SUCESS_TABLE,
                                        DELETE_TO_FAIL,
-                                       "select * from a_success_table", 
+                                       "select * from " + A_SUCESS_TABLE + "", 
                                        UPSERT_TO_FAIL), 
                                        2, new int[]{2,4}, true,
-                                       newArrayList("select count(*) from 
a_success_table where k='testOrderOfMutationsIsPredicatable' or k like 'z%'", 
// rows left: zz, zzz, checkThatAllStatementTypesMaintainOrderInConnection
-                                                    "select count(*) from " + 
TABLE_NAME_TO_FAIL + " where k = '" + ROW_TO_FAIL + "'",
-                                                    "select count(*) from " + 
TABLE_NAME_TO_FAIL + " where k = 'z'"), 
+                                       newArrayList("select count(*) from " + 
A_SUCESS_TABLE + " where k='testStatementOrderMaintainedInConnection' or k like 
'z%'", // rows left: zz, zzz, 
checkThatAllStatementTypesMaintainOrderInConnection
+                                                    "select count(*) from " + 
B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + 
"'",
+                                                    "select count(*) from " + 
B_FAILURE_TABLE + " where k = 'z'"), 
                                        newArrayList(new Integer(4), new 
Integer(0), new Integer(1)));
     }
     
@@ -241,6 +261,12 @@ public class PartialCommitIT {
                 assertArrayEquals(expectedUncommittedStatementIndexes, 
uncommittedStatementIndexes);
             }
             
+            ResultSet rs1 = sta.executeQuery("select * from A_SUCCESS_TABLE 
where k='checkThatAllStatementTypesMaintainOrderInConnection' or k like 'z%'");
+            while (rs1.next()) {
+                   System.out.println(rs1.getString(1));
+                   System.out.println(rs1.getString(2));
+            }
+            
             // verify data in HBase
             for (int i = 0; i < countStatementsForVerification.size(); i++) {
                 String countStatement = countStatementsForVerification.get(i);
@@ -259,7 +285,7 @@ public class PartialCommitIT {
         Connection con = driver.connect(url, new Properties());
         PhoenixConnection phxCon = new 
PhoenixConnection(con.unwrap(PhoenixConnection.class));
         final 
Map<TableRef,Map<ImmutableBytesPtr,MutationState.RowMutationState>> mutations = 
Maps.newTreeMap(new TableRefComparator());
-        // passing null mutation state forces the 
connection.newMutationState() to be used to create the MutationState
+        // passing a null mutation staate forces the 
connection.newMutationState() to be used to create the MutationState
         return new PhoenixConnection(phxCon, null) {
             @Override
             protected MutationState newMutationState(int maxSize) {
@@ -272,7 +298,7 @@ public class PartialCommitIT {
         @Override
         public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, 
Put put, WALEdit edit,
                 final Durability durability) throws HBaseIOException {
-            if (shouldFailUpsert(c, put)) {
+            if (shouldFail(c, put)) {
                 // throwing anything other than instances of IOException result
                 // in this coprocessor being unloaded
                 // DoNotRetryIOException tells HBase not to retry this mutation
@@ -284,7 +310,7 @@ public class PartialCommitIT {
         @Override
         public void preDelete(ObserverContext<RegionCoprocessorEnvironment> c,
                 Delete delete, WALEdit edit, Durability durability) throws 
IOException {
-            if (shouldFailDelete(c, delete)) {
+            if (shouldFail(c, delete)) {
                 // throwing anything other than instances of IOException result
                 // in this coprocessor being unloaded
                 // DoNotRetryIOException tells HBase not to retry this mutation
@@ -293,18 +319,13 @@ public class PartialCommitIT {
             }
         }
         
-        private boolean 
shouldFailUpsert(ObserverContext<RegionCoprocessorEnvironment> c, Put put) {
+        private boolean 
shouldFail(ObserverContext<RegionCoprocessorEnvironment> c, Mutation m) {
             String tableName = 
c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
-            return TABLE_NAME_TO_FAIL.equals(tableName) && 
Bytes.equals(ROW_TO_FAIL, put.getRow());
+            // deletes on transactional tables are converted to put, so use a 
single helper method
+            return tableName.contains(TABLE_NAME_TO_FAIL) && 
+                       (Bytes.equals(ROW_TO_FAIL_UPSERT_BYTES, m.getRow()) || 
Bytes.equals(ROW_TO_FAIL_DELETE_BYTES, m.getRow()));
         }
         
-        private boolean 
shouldFailDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete 
delete) {
-            String tableName = 
c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
-            return TABLE_NAME_TO_FAIL.equals(tableName) &&  
-                // Phoenix deletes are sent as Mutations with empty values
-                
delete.getFamilyCellMap().firstEntry().getValue().get(0).getValueLength() == 0 
&&
-                
delete.getFamilyCellMap().firstEntry().getValue().get(0).getQualifierLength() 
== 0;
-        }
     }
     
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b198d6b3/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 8b5bd14..8836249 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -745,7 +745,7 @@ public class MutationState implements SQLCloseable {
                    while (mutationsIterator.hasNext()) {
                        Pair<byte[],List<Mutation>> pair = 
mutationsIterator.next();
                        byte[] htableName = pair.getFirst();
-                       List<Mutation> mutations = pair.getSecond();
+                       List<Mutation> mutationList = pair.getSecond();
                        
                        //create a span per target table
                        //TODO maybe we can be smarter about the table name to 
string here?
@@ -756,7 +756,7 @@ public class MutationState implements SQLCloseable {
                        do {
                            ServerCache cache = null;
                            if (isDataTable) {
-                               cache = setMetaDataOnMutations(tableRef, 
mutations, indexMetaDataPtr);
+                               cache = setMetaDataOnMutations(tableRef, 
mutationList, indexMetaDataPtr);
                            }
                        
                            // If we haven't retried yet, retry for this case 
only, as it's possible that
@@ -785,19 +785,19 @@ public class MutationState implements SQLCloseable {
                                    }
                                    hTable = txnAware;
                                }
-                               long numMutations = mutations.size();
+                               long numMutations = mutationList.size();
                             GLOBAL_MUTATION_BATCH_SIZE.update(numMutations);
                             
                             long startTime = System.currentTimeMillis();
                             child.addTimelineAnnotation("Attempt " + 
retryCount);;
-                               hTable.batch(mutations);
+                               hTable.batch(mutationList);
                                child.stop();
                                child.stop();
                             shouldRetry = false;
                             long mutationCommitTime = 
System.currentTimeMillis() - startTime;
                             
GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime);
                             
-                            long mutationSizeBytes = 
calculateMutationSize(mutations);
+                            long mutationSizeBytes = 
calculateMutationSize(mutationList);
                             MutationMetric mutationsMetric = new 
MutationMetric(numMutations, mutationSizeBytes, mutationCommitTime);
                             
mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName), 
mutationsMetric);
                            } catch (Exception e) {
@@ -840,6 +840,8 @@ public class MutationState implements SQLCloseable {
                                        }
                                    } 
                                    if (sqlE != null) {
+                                       // clear pending mutations
+                                       mutations.clear();
                                        throw sqlE;
                                    }
                                }

Reply via email to