Repository: phoenix
Updated Branches:
  refs/heads/txn bb7c74df8 -> d0572f214


PHOENIX-2375 Prevent write of Tephra delete markers in preDelete to ensure 
current data state hasn't change


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d0572f21
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d0572f21
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d0572f21

Branch: refs/heads/txn
Commit: d0572f214ea96fb82b888e15ec71dbc9f3141e1e
Parents: bb7c74d
Author: James Taylor <[email protected]>
Authored: Fri Nov 6 17:09:10 2015 -0800
Committer: James Taylor <[email protected]>
Committed: Fri Nov 6 17:09:10 2015 -0800

----------------------------------------------------------------------
 .../apache/phoenix/end2end/index/IndexIT.java   |  2 ++
 .../index/PhoenixTransactionalIndexer.java      | 34 +++++++++++---------
 .../org/apache/phoenix/schema/PTableImpl.java   | 21 +++++++++---
 3 files changed, 37 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d0572f21/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
index 3738e5b..d2d5d21 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
@@ -33,6 +33,7 @@ import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -126,6 +127,7 @@ public class IndexIT extends BaseHBaseManagedTimeIT {
     }
     
     @Test
+    @Ignore("Failing due to zero byte incorrectly being stripped from row 
key") // FIXME: fixed in master, so remove this ignore tag when merged.
     public void testDeleteFromAllPKColumnIndex() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d0572f21/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 40ee817..68c1539 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -21,6 +21,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import co.cask.tephra.Transaction;
+import co.cask.tephra.Transaction.VisibilityLevel;
+import co.cask.tephra.TxConstants;
+import co.cask.tephra.hbase98.TransactionAwareHTable;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
@@ -67,11 +72,6 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.primitives.Longs;
 
-import co.cask.tephra.Transaction;
-import co.cask.tephra.Transaction.VisibilityLevel;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.hbase98.TransactionAwareHTable;
-
 /**
  * Do all the work of managing index updates for a transactional table from a 
single coprocessor. Since the transaction
  * manager essentially time orders writes through conflict detection, the 
logic to maintain a secondary index is quite a
@@ -232,14 +232,7 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
             if (isRollback) {
                 processRollback(env, indexMetaData, txRollbackAttribute, 
currentScanner, mutations, tx, mutableColumns, indexUpdates);
             } else {
-                processScanner(env, indexMetaData, txRollbackAttribute, 
currentScanner, mutations, tx, mutableColumns, indexUpdates);
-                for (Mutation m : mutations.values()) {
-                    TxTableState state = new TxTableState(env, mutableColumns, 
indexMetaData.getAttributes(), tx.getWritePointer(), m);
-                    // if we did not generate valid put, we might have to 
generate a delete
-                    if (!generatePuts(indexMetaData, indexUpdates, state)) {
-                        generateDeletes(indexMetaData, indexUpdates, 
txRollbackAttribute, state);
-                    }
-                }
+                processMutation(env, indexMetaData, txRollbackAttribute, 
currentScanner, mutations, tx, mutableColumns, indexUpdates);
             }
         } finally {
             if (txTable != null) txTable.close();
@@ -248,7 +241,7 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
         return indexUpdates;
     }
 
-    private void processScanner(RegionCoprocessorEnvironment env,
+    private void processMutation(RegionCoprocessorEnvironment env,
             PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute,
             ResultScanner scanner,
             Map<ImmutableBytesPtr, MultiMutation> mutations, Transaction tx,
@@ -257,6 +250,7 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
         if (scanner != null) {
             Result result;
             ColumnReference emptyColRef = new 
ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(),
 QueryConstants.EMPTY_COLUMN_BYTES);
+            // Process existing data table rows by removing the old index row 
and adding the new index row
             while ((result = scanner.next()) != null) {
                 Mutation m = mutations.remove(new 
ImmutableBytesPtr(result.getRow()));
                 TxTableState state = new TxTableState(env, mutableColumns, 
indexMetaData.getAttributes(), tx.getWritePointer(), m, emptyColRef, result);
@@ -264,6 +258,11 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
                 generatePuts(indexMetaData, indexUpdates, state);
             }
         }
+        // Process new data table by adding new index rows
+        for (Mutation m : mutations.values()) {
+            TxTableState state = new TxTableState(env, mutableColumns, 
indexMetaData.getAttributes(), tx.getWritePointer(), m);
+            generatePuts(indexMetaData, indexUpdates, state);
+        }
     }
 
     private void processRollback(RegionCoprocessorEnvironment env,
@@ -274,10 +273,15 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
             Collection<Pair<Mutation, byte[]>> indexUpdates) throws 
IOException {
         if (scanner != null) {
             Result result;
+            // Loop through last committed row state plus all new rows 
associated with current transaction
+            // to generate point delete markers for all index rows that were 
added. We don't have Tephra
+            // manage index rows in change sets because we don't want to be 
hit with the additional
+            // memory hit and do not need to do conflict detection on index 
rows.
             ColumnReference emptyColRef = new 
ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(),
 QueryConstants.EMPTY_COLUMN_BYTES);
             while ((result = scanner.next()) != null) {
                 Mutation m = mutations.remove(new 
ImmutableBytesPtr(result.getRow()));
-                // Sort by timestamp, type, cf, cq so we can process in time 
batches
+                // Sort by timestamp, type, cf, cq so we can process in time 
batches from oldest to newest
+                // (as if we're "replaying" them in time order).
                 List<Cell> cells = result.listCells();
                 Collections.sort(cells, new Comparator<Cell>() {
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d0572f21/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 2d2d593..f6613f4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -32,6 +32,8 @@ import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
+import co.cask.tephra.TxConstants;
+
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Delete;
@@ -616,7 +618,7 @@ public class PTableImpl implements PTable {
 
         private Put setValues;
         private Delete unsetValues;
-        private Delete deleteRow;
+        private Mutation deleteRow;
         private final long ts;
 
         public PRowImpl(KeyValueBuilder kvBuilder, ImmutableBytesWritable key, 
long ts, Integer bucketNum) {
@@ -720,11 +722,20 @@ public class PTableImpl implements PTable {
         @Override
         public void delete() {
             newMutations();
-            Delete delete = new Delete(key);
-            for (PColumnFamily colFamily : families) {
-               delete.deleteFamily(colFamily.getName().getBytes(), ts);
+            if (PTableImpl.this.isTransactional()) {
+                Put delete = new Put(key);
+                for (PColumnFamily colFamily : families) {
+                    delete.add(colFamily.getName().getBytes(), 
TxConstants.FAMILY_DELETE_QUALIFIER, ts,
+                            HConstants.EMPTY_BYTE_ARRAY);
+                }
+                deleteRow = delete;                
+            } else {
+                Delete delete = new Delete(key);
+                for (PColumnFamily colFamily : families) {
+                       delete.deleteFamily(colFamily.getName().getBytes(), ts);
+                }
+                deleteRow = delete;
             }
-            deleteRow = delete;
             // No need to write to the WAL for indexes
             if (PTableImpl.this.getType() == PTableType.INDEX) {
                 deleteRow.setDurability(Durability.SKIP_WAL);

Reply via email to