PHOENIX-2545 Abort transaction if send fails during commit

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/16e5c7bf
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/16e5c7bf
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/16e5c7bf

Branch: refs/heads/4.x-HBase-1.0
Commit: 16e5c7bf02a92c466b1730b6fa36371608d44f01
Parents: 55b7ada
Author: James Taylor <[email protected]>
Authored: Sat Dec 26 23:47:31 2015 -0800
Committer: James Taylor <[email protected]>
Committed: Wed Dec 30 17:57:41 2015 -0800

----------------------------------------------------------------------
 .../apache/phoenix/end2end/CreateTableIT.java   |  28 +++
 .../apache/phoenix/end2end/UpsertSelectIT.java  |   2 +-
 .../apache/phoenix/execute/PartialCommitIT.java |  44 ++--
 .../coprocessor/MetaDataEndpointImpl.java       |  13 +-
 .../apache/phoenix/execute/MutationState.java   | 216 ++++++++++++-------
 .../index/PhoenixTransactionalIndexer.java      |  17 +-
 6 files changed, 207 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/16e5c7bf/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
index 7c4576c..5ffc354 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.NewerTableAlreadyExistsException;
 import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.junit.Test;
@@ -407,4 +408,31 @@ public class CreateTableIT extends BaseClientManagedTimeIT 
{
             
assertEquals(SQLExceptionCode.COLUMN_FAMILY_NOT_ALLOWED_FOR_TTL.getErrorCode(),sqle.getErrorCode());
         }
     }
+    
+    @Test
+    public void testAlterDeletedTable() throws Exception {
+        String ddl = "create table T ("
+                + " K varchar primary key,"
+                + " V1 varchar)";
+        long ts = nextTimestamp();
+        Properties props = new Properties();
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.createStatement().execute(ddl);
+        conn.close();
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts+50));
+        Connection connAt50 = DriverManager.getConnection(getUrl(), props);
+        connAt50.createStatement().execute("DROP TABLE T");
+        connAt50.close();
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts+20));
+        Connection connAt20 = DriverManager.getConnection(getUrl(), props);
+        connAt20.createStatement().execute("UPDATE STATISTICS T"); // 
Invalidates from cache
+        try {
+            connAt20.createStatement().execute("ALTER TABLE T ADD V2 VARCHAR");
+            fail();
+        } catch (NewerTableAlreadyExistsException e) {
+            
+        }
+        connAt20.close();
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/16e5c7bf/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
index 689562a..b5252e0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
@@ -1109,7 +1109,7 @@ public class UpsertSelectIT extends 
BaseClientManagedTimeIT {
         }
 
         // upsert data into base table without specifying the row timestamp 
column PK2
-        long upsertedTs = 5;
+        long upsertedTs = nextTimestamp();
         try (Connection conn = getConnection(upsertedTs)) {
             // Upsert select in the same table with the row_timestamp column 
PK2 not specified. This will end up
             // creating a new row whose timestamp is the SCN of the 
connection. The same SCN will be used

http://git-wip-us.apache.org/repos/asf/phoenix/blob/16e5c7bf/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
index 61aae62..8d7ebcb 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
@@ -100,7 +100,10 @@ public class PartialCommitIT extends BaseOwnClusterIT {
         return Arrays.asList(false, true);
     }
     
+    private final boolean transactional;
+    
     public PartialCommitIT(boolean transactional) {
+        this.transactional = transactional;
                if (transactional) {
                        A_SUCESS_TABLE = "A_SUCCESS_TABLE_TXN";
                        B_FAILURE_TABLE = TABLE_NAME_TO_FAIL+"_TXN";
@@ -148,8 +151,8 @@ public class PartialCommitIT extends BaseOwnClusterIT {
     
     @Test
     public void testNoFailure() {
-        testPartialCommit(singletonList("upsert into " + A_SUCESS_TABLE + " 
values ('testNoFailure', 'a')"), 0, new int[0], false,
-                                        singletonList("select count(*) from " 
+ A_SUCESS_TABLE + " where k='testNoFailure'"), singletonList(new Integer(1)));
+        testPartialCommit(singletonList("upsert into " + A_SUCESS_TABLE + " 
values ('testNoFailure', 'a')"), new int[0], false, singletonList("select 
count(*) from " + A_SUCESS_TABLE + " where k='testNoFailure'"),
+                                        singletonList(new Integer(1)));
     }
     
     @Test
@@ -157,10 +160,10 @@ public class PartialCommitIT extends BaseOwnClusterIT {
         testPartialCommit(newArrayList("upsert into " + A_SUCESS_TABLE + " 
values ('testUpsertFailure1', 'a')", 
                                        UPSERT_TO_FAIL, 
                                        "upsert into " + A_SUCESS_TABLE + " 
values ('testUpsertFailure2', 'b')"), 
-                                       1, new int[]{1}, true,
+                                       transactional ? new int[] {0,1,2} : new 
int[]{1}, true,
                                        newArrayList("select count(*) from " + 
A_SUCESS_TABLE + " where k like 'testUpsertFailure_'",
-                                                    "select count(*) from " + 
B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + 
"'"), 
-                                       newArrayList(new Integer(2), new 
Integer(0)));
+                                                    "select count(*) from " + 
B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + 
"'"),
+                                       transactional ? newArrayList(new 
Integer(0), new Integer(0)) : newArrayList(new Integer(2), new Integer(0)));
     }
     
     @Test
@@ -172,10 +175,10 @@ public class PartialCommitIT extends BaseOwnClusterIT {
         
         testPartialCommit(newArrayList("upsert into " + A_SUCESS_TABLE + " 
values ('testUpsertSelectFailure', 'a')", 
                                        UPSERT_SELECT_TO_FAIL), 
-                                       1, new int[]{1}, true,
+                                       transactional ? new int[] {0,1} : new 
int[]{1}, true, 
                                        newArrayList("select count(*) from " + 
A_SUCESS_TABLE + " where k in ('testUpsertSelectFailure', '" + 
Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "')",
-                                                    "select count(*) from " + 
B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + 
"'"), 
-                                       newArrayList(new Integer(2), new 
Integer(0)));
+                                                    "select count(*) from " + 
B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + 
"'"),
+                                       transactional ? newArrayList(new 
Integer(1) /* from commit above */, new Integer(0)) : newArrayList(new 
Integer(2), new Integer(0)));
     }
     
     @Test
@@ -183,10 +186,10 @@ public class PartialCommitIT extends BaseOwnClusterIT {
         testPartialCommit(newArrayList("upsert into " + A_SUCESS_TABLE + " 
values ('testDeleteFailure1', 'a')", 
                                        DELETE_TO_FAIL,
                                        "upsert into " + A_SUCESS_TABLE + " 
values ('testDeleteFailure2', 'b')"), 
-                                       1, new int[]{1}, true,
+                                       transactional ? new int[] {0,1,2} : new 
int[]{1}, true, 
                                        newArrayList("select count(*) from " + 
A_SUCESS_TABLE + " where k like 'testDeleteFailure_'",
-                                                    "select count(*) from " + 
B_FAILURE_TABLE + " where k = 'z'"), 
-                                       newArrayList(new Integer(2), new 
Integer(1)));
+                                                    "select count(*) from " + 
B_FAILURE_TABLE + " where k = 'z'"),
+                                       transactional ? newArrayList(new 
Integer(0), new Integer(1) /* original row */) : newArrayList(new Integer(2), 
new Integer(1)));
     }
     
     /**
@@ -197,11 +200,11 @@ public class PartialCommitIT extends BaseOwnClusterIT {
         testPartialCommit(newArrayList("upsert into " + C_SUCESS_TABLE + " 
values ('testOrderOfMutationsIsPredicatable', 'c')", // will fail because 
c_success_table is after b_failure_table by table sort order
                                        UPSERT_TO_FAIL, 
                                        "upsert into " + A_SUCESS_TABLE + " 
values ('testOrderOfMutationsIsPredicatable', 'a')"), // will succeed because 
a_success_table is before b_failure_table by table sort order
-                                       2, new int[]{0,1}, true,
+                                       transactional ? new int[] {0,1,2} : new 
int[]{0,1}, true, 
                                        newArrayList("select count(*) from " + 
C_SUCESS_TABLE + " where k='testOrderOfMutationsIsPredicatable'",
                                                     "select count(*) from " + 
A_SUCESS_TABLE + " where k='testOrderOfMutationsIsPredicatable'",
-                                                    "select count(*) from " + 
B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + 
"'"), 
-                                       newArrayList(new Integer(0), new 
Integer(1), new Integer(0)));
+                                                    "select count(*) from " + 
B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + 
"'"),
+                                       transactional ? newArrayList(new 
Integer(0), new Integer(0), new Integer(0)) : newArrayList(new Integer(0), new 
Integer(1), new Integer(0)));
     }
     
     @Test
@@ -211,15 +214,15 @@ public class PartialCommitIT extends BaseOwnClusterIT {
                                        DELETE_TO_FAIL,
                                        "select * from " + A_SUCESS_TABLE + "", 
                                        UPSERT_TO_FAIL), 
-                                       2, new int[]{2,4}, true,
+                                       transactional ? new int[] {0,1,2,4} : 
new int[]{2,4}, true, 
                                        newArrayList("select count(*) from " + 
A_SUCESS_TABLE + " where k='testStatementOrderMaintainedInConnection' or k like 
'z%'", // rows left: zz, zzz, 
checkThatAllStatementTypesMaintainOrderInConnection
                                                     "select count(*) from " + 
B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + 
"'",
-                                                    "select count(*) from " + 
B_FAILURE_TABLE + " where k = 'z'"), 
-                                       newArrayList(new Integer(4), new 
Integer(0), new Integer(1)));
+                                                    "select count(*) from " + 
B_FAILURE_TABLE + " where k = 'z'"),
+                                       transactional ? newArrayList(new 
Integer(3) /* original rows */, new Integer(0), new Integer(1) /* original row 
*/) : newArrayList(new Integer(4), new Integer(0), new Integer(1)));
     }
     
-    private void testPartialCommit(List<String> statements, int failureCount, 
int[] expectedUncommittedStatementIndexes, boolean willFail,
-                                   List<String> 
countStatementsForVerification, List<Integer> expectedCountsForVerification) {
+    private void testPartialCommit(List<String> statements, int[] 
expectedUncommittedStatementIndexes, boolean willFail, List<String> 
countStatementsForVerification,
+                                   List<Integer> 
expectedCountsForVerification) {
         Preconditions.checkArgument(countStatementsForVerification.size() == 
expectedCountsForVerification.size());
         
         try (Connection con = 
getConnectionWithTableOrderPreservingMutationState()) {
@@ -241,7 +244,6 @@ public class PartialCommitIT extends BaseOwnClusterIT {
                 }
                 assertEquals(CommitException.class, sqle.getClass());
                 int[] uncommittedStatementIndexes = 
((CommitException)sqle).getUncommittedStatementIndexes();
-                assertEquals(failureCount, uncommittedStatementIndexes.length);
                 assertArrayEquals(expectedUncommittedStatementIndexes, 
uncommittedStatementIndexes);
             }
             
@@ -267,7 +269,7 @@ public class PartialCommitIT extends BaseOwnClusterIT {
         return new PhoenixConnection(phxCon, null) {
             @Override
             protected MutationState newMutationState(int maxSize) {
-                return new MutationState(maxSize, this, mutations, null);
+                return new MutationState(maxSize, this, mutations, null, null);
             };
         };
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/16e5c7bf/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 2043c66..ea4a7e1 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -978,13 +978,12 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
         scan.setFilter(new FirstKeyOnlyFilter());
         scan.setRaw(true);
         List<Cell> results = Lists.<Cell> newArrayList();
-        try (RegionScanner scanner = region.getScanner(scan);) {
+        try (RegionScanner scanner = region.getScanner(scan)) {
           scanner.next(results);
         }
-        // HBase ignores the time range on a raw scan (HBASE-7362)
-        if (!results.isEmpty() && results.get(0).getTimestamp() > 
clientTimeStamp) {
-            Cell kv = results.get(0);
-            if (kv.getTypeByte() == Type.Delete.getCode()) {
+        for (Cell kv : results) {
+            KeyValue.Type type = Type.codeToType(kv.getTypeByte());
+            if (type == Type.DeleteFamily) { // Row was deleted
                 Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
                         GlobalCache.getInstance(this.env).getMetaDataCache();
                 PTable table = newDeletedTableMarker(kv.getTimestamp());
@@ -1621,7 +1620,9 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                         && (table = buildTable(key, cacheKey, region, 
HConstants.LATEST_TIMESTAMP)) == null) {
                     // if not found then call newerTableExists and add delete 
marker for timestamp
                     // found
-                    if (buildDeletedTable(key, cacheKey, region, 
clientTimeStamp) != null) {
+                    table = buildDeletedTable(key, cacheKey, region, 
clientTimeStamp);
+                    if (table != null) {
+                        logger.info("Found newer table deleted as of " + 
table.getTimeStamp());
                         return new 
MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND,
                                 EnvironmentEdgeManager.currentTimeMillis(), 
null);
                     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/16e5c7bf/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 52bbeb6..fa5e962 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -25,6 +25,7 @@ import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -70,6 +71,7 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.ValueSchema.Field;
 import org.apache.phoenix.schema.types.PLong;
@@ -109,8 +111,9 @@ import co.cask.tephra.hbase10.TransactionAwareHTable;
 public class MutationState implements SQLCloseable {
     private static final Logger logger = 
LoggerFactory.getLogger(MutationState.class);
     private static final TransactionCodec CODEC = new TransactionCodec();
+    private static final int[] EMPTY_STATEMENT_INDEX_ARRAY = new int[0];
     
-    private PhoenixConnection connection;
+    private final PhoenixConnection connection;
     private final long maxSize;
     private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> 
mutations;
     private final List<TransactionAware> txAwares;
@@ -120,35 +123,39 @@ public class MutationState implements SQLCloseable {
     private Transaction tx;
     private long sizeOffset;
     private int numRows = 0;
-    private boolean txStarted = false;
+    private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
     
     private final MutationMetricQueue mutationMetricQueue;
     private ReadMetricQueue readMetricQueue;
     
     public MutationState(long maxSize, PhoenixConnection connection) {
-        this(maxSize,connection, null);
+        this(maxSize,connection, null, null);
+    }
+    
+    public MutationState(long maxSize, PhoenixConnection connection, 
TransactionContext txContext) {
+        this(maxSize,connection, null, txContext);
     }
     
     public MutationState(MutationState mutationState) {
-        this(mutationState.maxSize, mutationState.connection, 
mutationState.getTransaction());
+        this(mutationState.maxSize, mutationState.connection, 
mutationState.getTransaction(), null);
     }
     
     public MutationState(long maxSize, PhoenixConnection connection, long 
sizeOffset) {
-        this(maxSize, connection, null, sizeOffset);
+        this(maxSize, connection, null, null, sizeOffset);
     }
     
-    private MutationState(long maxSize, PhoenixConnection connection, 
Transaction tx) {
-        this(maxSize,connection, tx, 0);
+    private MutationState(long maxSize, PhoenixConnection connection, 
Transaction tx, TransactionContext txContext) {
+        this(maxSize,connection, tx, txContext, 0);
     }
     
-    private MutationState(long maxSize, PhoenixConnection connection, 
Transaction tx, long sizeOffset) {
-       this(maxSize, connection, Maps.<TableRef, 
Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(connection.getMutateBatchSize()),
 tx);
+    private MutationState(long maxSize, PhoenixConnection connection, 
Transaction tx, TransactionContext txContext, long sizeOffset) {
+       this(maxSize, connection, Maps.<TableRef, 
Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(connection.getMutateBatchSize()),
 tx, txContext);
         this.sizeOffset = sizeOffset;
     }
     
        MutationState(long maxSize, PhoenixConnection connection,
                        Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> 
mutations,
-                       Transaction tx) {
+                       Transaction tx, TransactionContext txContext) {
                this.maxSize = maxSize;
                this.connection = connection;
                this.mutations = mutations;
@@ -157,26 +164,34 @@ public class MutationState implements SQLCloseable {
                                : 
NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
                this.tx = tx;
                if (tx == null) {
-                       this.txAwares = Collections.emptyList();
-                       TransactionSystemClient txServiceClient = 
this.connection
-                                       
.getQueryServices().getTransactionSystemClient();
-                       this.txContext = new 
TransactionContext(txServiceClient);
+            this.txAwares = Collections.emptyList();
+                   if (txContext == null) {
+                       TransactionSystemClient txServiceClient = 
this.connection
+                                       
.getQueryServices().getTransactionSystemClient();
+                       this.txContext = new 
TransactionContext(txServiceClient);
+                   } else {
+                       this.txContext = txContext;
+                   }
                } else {
                        // this code path is only used while running child 
scans, we can't pass the txContext to child scans
                        // as it is not thread safe, so we use the tx member 
variable
-                       txAwares = Lists.newArrayList();
-                       txContext = null;
+                       this.txAwares = Lists.newArrayList();
+                       this.txContext = null;
                }
        }
 
     public MutationState(TableRef table, 
Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long 
maxSize, PhoenixConnection connection) {
-        this(maxSize, connection, null, sizeOffset);
+        this(maxSize, connection, null, null, sizeOffset);
         this.mutations.put(table, mutations);
         this.numRows = mutations.size();
         this.tx = connection.getMutationState().getTransaction();
         throwIfTooBig();
     }
     
+    public long getMaxSize() {
+        return maxSize;
+    }
+    
     public boolean checkpointIfNeccessary(MutationPlan plan) throws 
SQLException {
         Transaction currentTx = getTransaction();
         if (getTransaction() == null || plan.getTargetRef() == null || 
plan.getTargetRef().getTable() == null || 
!plan.getTargetRef().getTable().isTransactional()) {
@@ -308,9 +323,8 @@ public class MutationState implements SQLCloseable {
                }
         
         try {
-            if (!txStarted) {
+            if (!isTransactionStarted()) {
                 txContext.start();
-                txStarted = true;
                 return true;
             }
         } catch (TransactionFailureException e) {
@@ -320,7 +334,7 @@ public class MutationState implements SQLCloseable {
     }
 
     public static MutationState emptyMutationState(long maxSize, 
PhoenixConnection connection) {
-        MutationState state = new MutationState(maxSize, connection, 
Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), 
null);
+        MutationState state = new MutationState(maxSize, connection, 
Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), 
null, null);
         state.sizeOffset = 0;
         return state;
     }
@@ -599,18 +613,24 @@ public class MutationState implements SQLCloseable {
            Long scn = connection.getSCN();
            MetaDataClient client = new MetaDataClient(connection);
            long serverTimeStamp = tableRef.getTimeStamp();
-           PTable table = tableRef.getTable();
            // If we're auto committing, we've already validated the schema 
when we got the ColumnResolver,
            // so no need to do it again here.
            if (!connection.getAutoCommit()) {
+               PTable table = tableRef.getTable();
             MetaDataMutationResult result = 
client.updateCache(table.getSchemaName().getString(), 
table.getTableName().getString());
+            PTable resolvedTable = result.getTable();
+            if (resolvedTable == null) {
+                throw new 
TableNotFoundException(table.getSchemaName().getString(), 
table.getTableName().getString());
+            }
+            // Always update tableRef table as the one we've cached may be out 
of date since when we executed
+            // the UPSERT VALUES call and updated in the cache before this.
+            tableRef.setTable(resolvedTable);
             long timestamp = result.getMutationTime();
             if (timestamp != QueryConstants.UNSET_TIMESTAMP) {
                 serverTimeStamp = timestamp;
                 if (result.wasUpdated()) {
                     // TODO: use bitset?
-                    table = result.getTable();
-                    PColumn[] columns = new PColumn[table.getColumns().size()];
+                    PColumn[] columns = new 
PColumn[resolvedTable.getColumns().size()];
                     for (Map.Entry<ImmutableBytesPtr,RowMutationState> 
rowEntry : rowKeyToColumnMap.entrySet()) {
                        RowMutationState valueEntry = rowEntry.getValue();
                         if (valueEntry != null) {
@@ -624,10 +644,9 @@ public class MutationState implements SQLCloseable {
                     }
                     for (PColumn column : columns) {
                         if (column != null) {
-                            
table.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString());
+                            
resolvedTable.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString());
                         }
                     }
-                    tableRef.setTable(table);
                 }
             }
         }
@@ -731,25 +750,28 @@ public class MutationState implements SQLCloseable {
             sendAll = true;
         }
 
+        List<TableRef> txTableRefs = 
Lists.newArrayListWithExpectedSize(mutations.size());
         // add tracing for this operation
         try (TraceScope trace = Tracing.startNewSpan(connection, "Committing 
mutations to tables")) {
             Span span = trace.getSpan();
                ImmutableBytesWritable indexMetaDataPtr = new 
ImmutableBytesWritable();
+               boolean isTransactional;
                while (tableRefIterator.hasNext()) {
                        // at this point we are going through mutations for 
each table
-                   TableRef tableRef = tableRefIterator.next();
+                   final TableRef tableRef = tableRefIterator.next();
                    Map<ImmutableBytesPtr, RowMutationState> valuesMap = 
mutations.get(tableRef);
                    if (valuesMap == null || valuesMap.isEmpty()) {
                        continue;
                    }
-                   PTable table = tableRef.getTable();
+                // Validate as we go if transactional since we can undo if a 
problem occurs (which is unlikely)
+                long serverTimestamp = serverTimeStamps == null ? 
validate(tableRef, valuesMap) : serverTimeStamps[i++];
+                   final PTable table = tableRef.getTable();
                    // Track tables to which we've sent uncommitted data
-                   if (table.isTransactional()) {
+                   if (isTransactional = table.isTransactional()) {
+                    txTableRefs.add(tableRef);
                        
uncommittedPhysicalNames.add(table.getPhysicalName().getString());
                    }
                    boolean isDataTable = true;
-                   // Validate as we go if transactional since we can undo if 
a problem occurs (which is unlikely)
-                   long serverTimestamp = serverTimeStamps == null ? 
validate(tableRef, valuesMap) : serverTimeStamps[i++];
                 table.getIndexMaintainers(indexMetaDataPtr, connection);
                    Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = 
addRowMutations(tableRef, valuesMap, serverTimestamp, false, sendAll);
                    while (mutationsIterator.hasNext()) {
@@ -776,7 +798,7 @@ public class MutationState implements SQLCloseable {
                            SQLException sqlE = null;
                            HTableInterface hTable = 
connection.getQueryServices().getTable(htableName);
                            try {
-                               if (table.isTransactional()) {
+                               if (isTransactional) {
                                    // If we have indexes, wrap the HTable in a 
delegate HTable that
                                    // will attach the necessary index meta 
data in the event of a
                                    // rollback
@@ -830,8 +852,8 @@ public class MutationState implements SQLCloseable {
                                    }
                                    e = inferredE;
                                }
-                               // Throw to client with both what was committed 
so far and what is left to be committed.
-                               // That way, client can either undo what was 
done or try again with what was not done.
+                               // Throw to client an exception that indicates 
the statements that
+                               // were not committed successfully.
                                sqlE = new CommitException(e, 
getUncommittedStatementIndexes());
                            } finally {
                                try {
@@ -862,17 +884,24 @@ public class MutationState implements SQLCloseable {
                    if (tableRef.getTable().getType() != PTableType.INDEX) {
                        numRows -= valuesMap.size();
                    }
-                   // Remove batches as we process them
+                   // For transactions, track the statement indexes as we send 
data
+                   // over because our CommitException should include all 
statements
+                   // involved in the transaction since none of them would 
have been
+                   // committed in the event of a failure.
+                   if (isTransactional) {
+                       addUncommittedStatementIndexes(valuesMap.values());
+                   }
+                // Remove batches as we process them
                    if (sendAll) {
-                       tableRefIterator.remove(); // Iterating through actual 
map in this case
+                       // Iterating through map key set in this case, so we 
cannot use
+                       // the remove method without getting a concurrent 
modification
+                       // exception.
+                       tableRefIterator.remove();
                    } else {
                        mutations.remove(tableRef);
                    }
                }
         }
-        // Note that we cannot assume that *all* mutations have been sent, 
since we've optimized this
-        // now to only send the mutations for the tables we're querying, hence 
we've removed the
-        // assertions that we're here before.
     }
 
     public byte[] encodeTransaction() throws SQLException {
@@ -930,19 +959,17 @@ public class MutationState implements SQLCloseable {
         return cache;
     }
     
-    private void clear() throws SQLException {
-        this.mutations.clear();
-        numRows = 0;
+    private void addUncommittedStatementIndexes(Collection<RowMutationState> 
rowMutations) {
+        for (RowMutationState rowMutationState : rowMutations) {
+            uncommittedStatementIndexes = 
joinSortedIntArrays(uncommittedStatementIndexes, 
rowMutationState.getStatementIndexes());
+        }
     }
     
     private int[] getUncommittedStatementIndexes() {
-       int[] result = new int[0];
-       for (Map<ImmutableBytesPtr, RowMutationState> rowMutations : 
mutations.values()) {
-               for (RowMutationState rowMutationState : rowMutations.values()) 
{
-                       result = joinSortedIntArrays(result, 
rowMutationState.getStatementIndexes());
-               }
+       for (Map<ImmutableBytesPtr, RowMutationState> rowMutationMap : 
mutations.values()) {
+           addUncommittedStatementIndexes(rowMutationMap.values());
        }
-       return result;
+       return uncommittedStatementIndexes;
     }
     
     @Override
@@ -950,65 +977,96 @@ public class MutationState implements SQLCloseable {
     }
 
     private void reset() {
-        txStarted = false;
         tx = null;
+        txAwares.clear();
         uncommittedPhysicalNames.clear();
+        this.mutations.clear();
+        numRows = 0;
+        uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
     }
     
     public void rollback() throws SQLException {
-        clear();
-        txAwares.clear();
-        if (txContext != null) {
-            try {
-                if (txStarted) {
+        try {
+            if (txContext != null && isTransactionStarted()) {
+                try {
                     txContext.abort();
+                } catch (TransactionFailureException e) {
+                    throw TransactionUtil.getTransactionFailureException(e);
                 }
-            } catch (TransactionFailureException e) {
-                throw new SQLException(e); // TODO: error code
-            } finally {
-               reset();
             }
+        } finally {
+            reset();
         }
     }
     
     public void commit() throws SQLException {
-       boolean sendMutationsFailed=false;
+       boolean sendSuccessful=false;
+       SQLException sqlE = null;
         try {
             send();
-        } catch (Throwable t) {
-               sendMutationsFailed=true;
-               throw t;
+            sendSuccessful=true;
+        } catch (SQLException e) {
+            sqlE = e;
         } finally {
-            txAwares.clear();
-            if (txContext != null) {
-                try {
-                    if (txStarted && !sendMutationsFailed) {
-                        txContext.finish();
-                    }
-                } catch (TransactionFailureException e) {
+            try {
+                if (txContext != null && isTransactionStarted()) {
+                    TransactionFailureException txFailure = null;
+                    boolean finishSuccessful=false;
                     try {
-                        txContext.abort(e);
-                        // abort and throw the original commit failure 
exception
-                        throw 
TransactionUtil.getTransactionFailureException(e);
-                    } catch (TransactionFailureException e1) {
-                        // if abort fails and throw the abort failure exception
-                        throw 
TransactionUtil.getTransactionFailureException(e1);
+                        if (sendSuccessful) {
+                            txContext.finish();
+                            finishSuccessful = true;
+                        }
+                    } catch (TransactionFailureException e) {
+                        txFailure = e;
+                        SQLException nextE = 
TransactionUtil.getTransactionFailureException(e);
+                        if (sqlE == null) {
+                            sqlE = nextE;
+                        } else {
+                            sqlE.setNextException(nextE);
+                        }
+                    } finally {
+                        // If send fails or finish fails, abort the tx
+                        if (!finishSuccessful) {
+                            try {
+                                txContext.abort(txFailure);
+                            } catch (TransactionFailureException e) {
+                                SQLException nextE = 
TransactionUtil.getTransactionFailureException(e);
+                                if (sqlE == null) {
+                                    sqlE = nextE;
+                                } else {
+                                    sqlE.setNextException(nextE);
+                                }
+                            }
+                        }
                     }
+                }
+            } finally {
+                try {
+                    reset();
                 } finally {
-                       if (!sendMutationsFailed) {
-                               reset();
-                       }
-                  }
+                    if (sqlE != null) {
+                        throw sqlE;
+                    }
+                }
             }
         }
     }
 
     /**
+     * Send to HBase any uncommitted data for transactional tables.
+     * @return true if any data was sent and false otherwise.
+     * @throws SQLException
+     */
+    public boolean sendUncommitted() throws SQLException {
+        return sendUncommitted(mutations.keySet().iterator());
+    }
+    /**
      * Support read-your-own-write semantics by sending uncommitted data to 
HBase prior to running a
      * query. In this way, they are visible to subsequent reads but are not 
actually committed until
      * commit is called.
      * @param tableRefs
-     * @return true if at least partially transactional and false otherwise.
+     * @return true if any data was sent and false otherwise.
      * @throws SQLException
      */
     public boolean sendUncommitted(Iterator<TableRef> tableRefs) throws 
SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/16e5c7bf/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 3fecc18..2925b09 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -71,16 +71,19 @@ import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
 
-import co.cask.tephra.Transaction;
-import co.cask.tephra.Transaction.VisibilityLevel;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.hbase10.TransactionAwareHTable;
-
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.primitives.Longs;
 
+import co.cask.tephra.Transaction;
+import co.cask.tephra.Transaction;
+import co.cask.tephra.Transaction.VisibilityLevel;
+import co.cask.tephra.Transaction.VisibilityLevel;
+import co.cask.tephra.TxConstants;
+import co.cask.tephra.TxConstants;
+import co.cask.tephra.hbase10.TransactionAwareHTable;
+
 /**
  * Do all the work of managing index updates for a transactional table from a 
single coprocessor. Since the transaction
  * manager essentially time orders writes through conflict detection, the 
logic to maintain a secondary index is quite a
@@ -196,7 +199,9 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
         // run a scan to get the current state. We'll need this to delete
         // the existing index rows.
         Transaction tx = indexMetaData.getTransaction();
-        assert(tx != null);
+        if (tx == null) {
+            throw new NullPointerException("Expected to find transaction in 
metadata for " + env.getRegionInfo().getTable().getNameAsString());
+        }
         List<IndexMaintainer> indexMaintainers = 
indexMetaData.getIndexMaintainers();
         Set<ColumnReference> mutableColumns = 
Sets.newHashSetWithExpectedSize(indexMaintainers.size() * 10);
         for (IndexMaintainer indexMaintainer : indexMaintainers) {

Reply via email to