Repository: phoenix
Updated Branches:
  refs/heads/master 596726089 -> 491fc54d9


PHOENIX-2824 PhoenixTransactionalIndexer rollback doesn't work correctly


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f4897fd9
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f4897fd9
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f4897fd9

Branch: refs/heads/master
Commit: f4897fd941749dfb22cf449dcda9943984850dca
Parents: 5967260
Author: James Taylor <[email protected]>
Authored: Thu May 5 15:58:02 2016 -0700
Committer: James Taylor <[email protected]>
Committed: Thu May 5 15:58:02 2016 -0700

----------------------------------------------------------------------
 .../org/apache/phoenix/tx/TxCheckpointIT.java   | 22 +++++++++++++-----
 .../index/PhoenixTransactionalIndexer.java      | 24 ++++++++++++--------
 2 files changed, 30 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/f4897fd9/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
index c7fc053..11b587a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
@@ -37,7 +37,6 @@ import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
 import org.apache.phoenix.end2end.Shadower;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
@@ -49,10 +48,10 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
-import com.google.common.collect.Maps;
-
 import co.cask.tephra.Transaction.VisibilityLevel;
 
+import com.google.common.collect.Maps;
+
 @RunWith(Parameterized.class)
 public class TxCheckpointIT extends BaseHBaseManagedTimeIT {
        
@@ -110,14 +109,25 @@ public class TxCheckpointIT extends 
BaseHBaseManagedTimeIT {
     }
     
     @Test
-    public void testRollbackOfUncommittedDelete() throws Exception {
+    public void testRollbackOfUncommittedDeleteSingleCol() throws Exception {
+        String indexDDL = "CREATE "+(localIndex? "LOCAL " : "")+"INDEX " + 
indexName + " ON " + fullTableName + " (v1) INCLUDE(v2)";
+        testRollbackOfUncommittedDelete(indexDDL);
+    }
+
+    @Test
+    public void testRollbackOfUncommittedDeleteMultiCol() throws Exception {
+        String indexDDL = "CREATE "+(localIndex? "LOCAL " : "")+"INDEX " + 
indexName + " ON " + fullTableName + " (v1, v2)";
+        testRollbackOfUncommittedDelete(indexDDL);
+    }
+    
+    private void testRollbackOfUncommittedDelete(String indexDDL) throws 
Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         Connection conn = DriverManager.getConnection(getUrl(), props);
         conn.setAutoCommit(false);
         try {
             Statement stmt = conn.createStatement();
             stmt.execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY 
KEY, v1 VARCHAR, v2 VARCHAR)"+(!mutable? " IMMUTABLE_ROWS=true" : ""));
-            stmt.execute("CREATE "+(localIndex? "LOCAL " : "")+"INDEX " + 
indexName + " ON " + fullTableName + " (v1) INCLUDE(v2)");
+            stmt.execute(indexDDL);
             
             stmt.executeUpdate("upsert into " + fullTableName + " values('x1', 
'y1', 'a1')");
             stmt.executeUpdate("upsert into " + fullTableName + " values('x2', 
'y2', 'a2')");
@@ -182,7 +192,7 @@ public class TxCheckpointIT extends BaseHBaseManagedTimeIT {
             //assert two rows in index table
             rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName + 
" ORDER BY v1");
             assertTrue(rs.next());
-            assertEquals("x1", rs.getString(1));
+            assertEquals("x1", rs.getString(1)); // fails here
             assertEquals("y1", rs.getString(2));
             assertEquals("a1", rs.getString(3));
             assertTrue(rs.next());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f4897fd9/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index e4c106e..d42e468 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.ListIterator;
 import java.util.Map;
 import java.util.Set;
 
@@ -72,16 +73,16 @@ import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.primitives.Longs;
-
 import co.cask.tephra.Transaction;
 import co.cask.tephra.Transaction.VisibilityLevel;
 import co.cask.tephra.TxConstants;
 import co.cask.tephra.hbase11.TransactionAwareHTable;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.primitives.Longs;
+
 /**
  * Do all the work of managing index updates for a transactional table from a 
single coprocessor. Since the transaction
  * manager essentially time orders writes through conflict detection, the 
logic to maintain a secondary index is quite a
@@ -318,17 +319,20 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
                     boolean hasPuts = false;
                     LinkedList<Cell> singleTimeCells = Lists.newLinkedList();
                     long writePtr;
+                    Cell cell = cells.get(i);
                     do {
-                        Cell cell = cells.get(i);
                         hasPuts |= cell.getTypeByte() == 
KeyValue.Type.Put.getCode();
                         writePtr = cell.getTimestamp();
+                        ListIterator<Cell> it = singleTimeCells.listIterator();
                         do {
                             // Add at the beginning of the list to match the 
expected HBase
                             // newest to oldest sort order (which TxTableState 
relies on
-                            // with the Result.getLatestColumnValue() calls).
-                            singleTimeCells.addFirst(cell);
-                        } while (++i < nCells && cells.get(i).getTimestamp() 
== writePtr);
-                    } while (i < nCells && cells.get(i).getTimestamp() <= 
readPtr);
+                            // with the Result.getLatestColumnValue() calls). 
However, we
+                            // still want to add Cells in the expected order 
for each time
+                            // bound as otherwise we won't find it in our old 
state.
+                            it.add(cell);
+                        } while (++i < nCells && 
(cell=cells.get(i)).getTimestamp() == writePtr);
+                    } while (i < nCells && cell.getTimestamp() <= readPtr);
                     
                     // Generate point delete markers for the prior row 
deletion of the old index value.
                     // The write timestamp is the next timestamp, not the 
current timestamp,

Reply via email to