Repository: phoenix Updated Branches: refs/heads/master 596726089 -> 491fc54d9
PHOENIX-2824 PhoenixTransactionalIndexer rollback doesn't work correctly Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f4897fd9 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f4897fd9 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f4897fd9 Branch: refs/heads/master Commit: f4897fd941749dfb22cf449dcda9943984850dca Parents: 5967260 Author: James Taylor <[email protected]> Authored: Thu May 5 15:58:02 2016 -0700 Committer: James Taylor <[email protected]> Committed: Thu May 5 15:58:02 2016 -0700 ---------------------------------------------------------------------- .../org/apache/phoenix/tx/TxCheckpointIT.java | 22 +++++++++++++----- .../index/PhoenixTransactionalIndexer.java | 24 ++++++++++++-------- 2 files changed, 30 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/f4897fd9/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java index c7fc053..11b587a 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java @@ -37,7 +37,6 @@ import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; import org.apache.phoenix.end2end.Shadower; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ReadOnlyProps; @@ -49,10 +48,10 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; -import com.google.common.collect.Maps; - import co.cask.tephra.Transaction.VisibilityLevel; +import com.google.common.collect.Maps; + @RunWith(Parameterized.class) public class TxCheckpointIT extends BaseHBaseManagedTimeIT { @@ -110,14 +109,25 @@ public class TxCheckpointIT extends BaseHBaseManagedTimeIT { } @Test - public void testRollbackOfUncommittedDelete() throws Exception { + public void testRollbackOfUncommittedDeleteSingleCol() throws Exception { + String indexDDL = "CREATE "+(localIndex? "LOCAL " : "")+"INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE(v2)"; + testRollbackOfUncommittedDelete(indexDDL); + } + + @Test + public void testRollbackOfUncommittedDeleteMultiCol() throws Exception { + String indexDDL = "CREATE "+(localIndex? "LOCAL " : "")+"INDEX " + indexName + " ON " + fullTableName + " (v1, v2)"; + testRollbackOfUncommittedDelete(indexDDL); + } + + private void testRollbackOfUncommittedDelete(String indexDDL) throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); Connection conn = DriverManager.getConnection(getUrl(), props); conn.setAutoCommit(false); try { Statement stmt = conn.createStatement(); stmt.execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"+(!mutable? " IMMUTABLE_ROWS=true" : "")); - stmt.execute("CREATE "+(localIndex? "LOCAL " : "")+"INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE(v2)"); + stmt.execute(indexDDL); stmt.executeUpdate("upsert into " + fullTableName + " values('x1', 'y1', 'a1')"); stmt.executeUpdate("upsert into " + fullTableName + " values('x2', 'y2', 'a2')"); @@ -182,7 +192,7 @@ public class TxCheckpointIT extends BaseHBaseManagedTimeIT { //assert two rows in index table rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName + " ORDER BY v1"); assertTrue(rs.next()); - assertEquals("x1", rs.getString(1)); + assertEquals("x1", rs.getString(1)); // fails here assertEquals("y1", rs.getString(2)); assertEquals("a1", rs.getString(3)); assertTrue(rs.next()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/f4897fd9/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index e4c106e..d42e468 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.ListIterator; import java.util.Map; import java.util.Set; @@ -72,16 +73,16 @@ import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.ServerUtil; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.common.primitives.Longs; - import co.cask.tephra.Transaction; import co.cask.tephra.Transaction.VisibilityLevel; import co.cask.tephra.TxConstants; import co.cask.tephra.hbase11.TransactionAwareHTable; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.primitives.Longs; + /** * Do all the work of managing index updates for a transactional table from a single coprocessor. Since the transaction * manager essentially time orders writes through conflict detection, the logic to maintain a secondary index is quite a @@ -318,17 +319,20 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { boolean hasPuts = false; LinkedList<Cell> singleTimeCells = Lists.newLinkedList(); long writePtr; + Cell cell = cells.get(i); do { - Cell cell = cells.get(i); hasPuts |= cell.getTypeByte() == KeyValue.Type.Put.getCode(); writePtr = cell.getTimestamp(); + ListIterator<Cell> it = singleTimeCells.listIterator(); do { // Add at the beginning of the list to match the expected HBase // newest to oldest sort order (which TxTableState relies on - // with the Result.getLatestColumnValue() calls). - singleTimeCells.addFirst(cell); - } while (++i < nCells && cells.get(i).getTimestamp() == writePtr); - } while (i < nCells && cells.get(i).getTimestamp() <= readPtr); + // with the Result.getLatestColumnValue() calls). However, we + // still want to add Cells in the expected order for each time + // bound as otherwise we won't find it in our old state. + it.add(cell); + } while (++i < nCells && (cell=cells.get(i)).getTimestamp() == writePtr); + } while (i < nCells && cell.getTimestamp() <= readPtr); // Generate point delete markers for the prior row deletion of the old index value. // The write timestamp is the next timestamp, not the current timestamp,
