This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch PHOENIX-5748-4.x-HBase-1.5
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-5748-4.x-HBase-1.5 by 
this push:
     new 7233fa7  PHOENIX-5795 Supporting selective queries for index rows 
updated concurrently
7233fa7 is described below

commit 7233fa75aa8f14236feaf1201b0811b30d133300
Author: Kadir <kozde...@salesforce.com>
AuthorDate: Mon Mar 23 10:48:18 2020 -0700

    PHOENIX-5795 Supporting selective queries for index rows updated 
concurrently
---
 .../end2end/ConcurrentMutationsExtendedIT.java     |  77 ++++++++++++
 .../phoenix/hbase/index/IndexRegionObserver.java   | 138 +++++++++++----------
 .../hbase/index/util/IndexManagementUtil.java      |  27 ++++
 3 files changed, 174 insertions(+), 68 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
index 546c1fe..d35451a 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
@@ -19,7 +19,10 @@ package org.apache.phoenix.end2end;
 
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
@@ -28,6 +31,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.util.*;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -65,6 +69,18 @@ public class ConcurrentMutationsExtendedIT extends 
ParallelStatsDisabledIT {
         // Now we rebuild the entire index table and expect that it is still 
good after the full rebuild
         IndexToolIT.runIndexTool(true, false, "", tableName, indexName, null,
                 0, IndexTool.IndexVerifyType.AFTER);
+        // Truncate, rebuild and verify the index table
+        PTable pIndexTable = PhoenixRuntime.getTable(conn, indexName);
+        TableName physicalTableName = 
TableName.valueOf(pIndexTable.getPhysicalName().getBytes());
+        PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+        try (Admin admin = pConn.getQueryServices().getAdmin()) {
+            admin.disableTable(physicalTableName);
+            admin.truncateTable(physicalTableName, true);
+        }
+        IndexToolIT.runIndexTool(true, false, "", tableName, indexName, null,
+                0, IndexTool.IndexVerifyType.AFTER);
+        long actualRowCountAfterCompaction = 
IndexScrutiny.scrutinizeIndex(conn, tableName, indexName);
+        assertEquals(actualRowCount, actualRowCountAfterCompaction);
         return actualRowCount;
     }
 
@@ -265,6 +281,67 @@ public class ConcurrentMutationsExtendedIT extends 
ParallelStatsDisabledIT {
     }
 
     @Test
+    public void testConcurrentUpsertsWithNoIndexedColumns() throws Exception {
+        int nThreads = 4;
+        final int batchSize = 100;
+        final int nRows = 997;
+        final String tableName = generateUniqueName();
+        final String indexName = generateUniqueName();
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute("CREATE TABLE " + tableName
+                + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, a.v1 INTEGER, 
b.v2 INTEGER, c.v3 INTEGER, d.v4 INTEGER," +
+                "CONSTRAINT pk PRIMARY KEY (k1,k2))  COLUMN_ENCODED_BYTES = 0, 
VERSIONS=1");
+        TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class);
+        conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + 
tableName + "(v1) INCLUDE(v2, v3)");
+        final CountDownLatch doneSignal = new CountDownLatch(nThreads);
+        Runnable[] runnables = new Runnable[nThreads];
+        for (int i = 0; i < nThreads; i++) {
+            runnables[i] = new Runnable() {
+
+                @Override public void run() {
+                    try {
+                        Connection conn = 
DriverManager.getConnection(getUrl());
+                        for (int i = 0; i < 1000; i++) {
+                            if (RAND.nextInt() % 1000 < 10) {
+                                // Do not include the indexed column in upserts
+                                conn.createStatement().execute(
+                                        "UPSERT INTO " + tableName + " (k1, 
k2, b.v2, c.v3, d.v4) VALUES ("
+                                                + (RAND.nextInt() % nRows) + 
", 0, "
+                                                + (RAND.nextBoolean() ? null : 
RAND.nextInt()) + ", "
+                                                + (RAND.nextBoolean() ? null : 
RAND.nextInt()) + ", "
+                                                + (RAND.nextBoolean() ? null : 
RAND.nextInt()) + ")");
+                            } else {
+                                conn.createStatement().execute(
+                                        "UPSERT INTO " + tableName + " VALUES 
(" + (i % nRows) + ", 0, "
+                                                + (RAND.nextBoolean() ? null : 
RAND.nextInt()) + ", "
+                                                + (RAND.nextBoolean() ? null : 
RAND.nextInt()) + ", "
+                                                + (RAND.nextBoolean() ? null : 
RAND.nextInt()) + ", "
+                                                + (RAND.nextBoolean() ? null : 
RAND.nextInt()) + ")");
+                            }
+                            if ((i % batchSize) == 0) {
+                                conn.commit();
+                            }
+                        }
+                        conn.commit();
+                    } catch (SQLException e) {
+                        throw new RuntimeException(e);
+                    } finally {
+                        doneSignal.countDown();
+                    }
+                }
+
+            };
+        }
+        for (int i = 0; i < nThreads; i++) {
+            Thread t = new Thread(runnables[i]);
+            t.start();
+        }
+
+        assertTrue("Ran out of time", doneSignal.await(120, TimeUnit.SECONDS));
+        verifyIndexTable(tableName, indexName, conn);
+    }
+
+    @Test
     public void testRowLockDuringPreBatchMutateWhenIndexed() throws Exception {
         final String tableName = LOCK_TEST_TABLE_PREFIX + generateUniqueName();
         final String indexName = generateUniqueName();
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 7d6907b..6246697 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableSet;
 import java.util.concurrent.ConcurrentHashMap;
 
 import com.google.common.collect.ArrayListMultimap;
@@ -71,6 +72,7 @@ import org.apache.phoenix.hbase.index.LockManager.RowLock;
 import org.apache.phoenix.hbase.index.builder.IndexBuildManager;
 import org.apache.phoenix.hbase.index.builder.IndexBuilder;
 import org.apache.phoenix.hbase.index.covered.IndexMetaData;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSource;
 import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
 import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
@@ -86,6 +88,7 @@ import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.trace.TracingUtils;
 import org.apache.phoenix.trace.util.NullSpan;
+import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.ServerUtil.ConnectionType;
@@ -171,6 +174,9 @@ public class IndexRegionObserver extends BaseRegionObserver 
{
       private boolean rebuild;
       // The current and next states of the data rows corresponding to the 
pending mutations
       private HashMap<ImmutableBytesPtr, Pair<Put, Put>> dataRowStates;
+      // Data table pending mutations
+      private Map<ImmutableBytesPtr, MultiMutation> multiMutationMap;
+
       private BatchMutateContext(int clientVersion) {
           this.clientVersion = clientVersion;
       }
@@ -409,42 +415,9 @@ public class IndexRegionObserver extends 
BaseRegionObserver {
       }
   }
 
-    /**
-     * This method is only used for local indexes
-     */
     private Collection<? extends Mutation> 
groupMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
-                                                          long now) throws 
IOException {
-        Map<ImmutableBytesPtr, MultiMutation> mutationsMap = new HashMap<>();
-        boolean copyMutations = false;
-        for (int i = 0; i < miniBatchOp.size(); i++) {
-            if (miniBatchOp.getOperationStatus(i) == IGNORE) {
-                continue;
-            }
-            Mutation m = miniBatchOp.getOperation(i);
-            if (this.builder.isEnabled(m)) {
-                // Track whether or not we need to
-                ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
-                if (mutationsMap.containsKey(row)) {
-                    copyMutations = true;
-                } else {
-                    mutationsMap.put(row, null);
-                }
-            }
-        }
-        // early exit if it turns out we don't have any edits
-        if (mutationsMap.isEmpty()) {
-            return null;
-        }
-        // If we're copying the mutations
-        Collection<Mutation> originalMutations;
-        Collection<? extends Mutation> mutations;
-        if (copyMutations) {
-            originalMutations = null;
-            mutations = mutationsMap.values();
-        } else {
-            originalMutations = 
Lists.newArrayListWithExpectedSize(mutationsMap.size());
-            mutations = originalMutations;
-        }
+                                                          BatchMutateContext 
context) throws IOException {
+        context.multiMutationMap = new HashMap<>();
         for (int i = 0; i < miniBatchOp.size(); i++) {
             Mutation m = miniBatchOp.getOperation(i);
             // skip this mutation if we aren't enabling indexing
@@ -452,37 +425,17 @@ public class IndexRegionObserver extends 
BaseRegionObserver {
             // should be indexed, which means we need to expose another method 
on the builder. Such is the
             // way optimization go though.
             if (miniBatchOp.getOperationStatus(i) != IGNORE && 
this.builder.isEnabled(m)) {
-                // Unless we're replaying edits to rebuild the index, we 
update the time stamp
-                // of the data table to prevent overlapping time stamps (which 
prevents index
-                // inconsistencies as this case isn't handled correctly 
currently).
-                for (List<Cell> cells : m.getFamilyCellMap().values()) {
-                    for (Cell cell : cells) {
-                        CellUtil.setTimestamp(cell, now);
-                    }
-                }
-                // Only copy mutations if we found duplicate rows
-                // which only occurs when we're partially rebuilding
-                // the index (since we'll potentially have both a
-                // Put and a Delete mutation for the same row).
-                if (copyMutations) {
-                    // Add the mutation to the batch set
-                    ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
-                    MultiMutation stored = mutationsMap.get(row);
+                ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
+                MultiMutation stored = context.multiMutationMap.get(row);
+                if (stored == null) {
                     // we haven't seen this row before, so add it
-                    if (stored == null) {
-                        stored = new MultiMutation(row);
-                        mutationsMap.put(row, stored);
-                    }
-                    stored.addAll(m);
-                } else {
-                    originalMutations.add(m);
+                    stored = new MultiMutation(row);
+                    context.multiMutationMap.put(row, stored);
                 }
+                stored.addAll(m);
             }
         }
-        if (copyMutations) {
-            mutations = 
IndexManagementUtil.flattenMutationsByTimestamp(mutations);
-        }
-        return mutations;
+        return context.multiMutationMap.values();
     }
 
     public static void setTimestamp(Mutation m, long ts) throws IOException {
@@ -808,7 +761,37 @@ public class IndexRegionObserver extends 
BaseRegionObserver {
         return (PhoenixIndexMetaData)indexMetaData;
     }
 
-    private void preparePostIndexMutations(BatchMutateContext context, long 
now, PhoenixIndexMetaData indexMetaData)
+    /**
+     * IndexMaintainer.getIndexedColumns() returns the data column references 
for indexed columns. The data columns are
+     * grouped into three classes, pk columns (data table pk columns), the 
indexed columns (the columns for which
+     * we want to have indexing; they form the prefix for the primary key for 
the index table (after salt and tenant id))
+     * and covered columns. The purpose of this method is to find out if all 
the indexed columns are included in the
+     * pending data table mutation pointed by multiMutation.
+     */
+    private boolean hasAllIndexedColumns(IndexMaintainer indexMaintainer, 
MultiMutation multiMutation) {
+        Map<byte[], List<Cell>> familyMap = multiMutation.getFamilyCellMap();
+        for (ColumnReference columnReference : 
indexMaintainer.getIndexedColumns()) {
+            byte[] family = columnReference.getFamily();
+            List<Cell> cellList = familyMap.get(family);
+            if (cellList == null) {
+                return false;
+            }
+            boolean has = false;
+            for (Cell cell : cellList) {
+                if (CellUtil.matchingColumn(cell, family, 
columnReference.getQualifier())) {
+                    has = true;
+                    break;
+                }
+            }
+            if (!has) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private void preparePostIndexMutations(BatchMutateContext context, long 
now, PhoenixIndexMetaData indexMetaData,
+                                           String tableName)
             throws Throwable {
         context.postIndexUpdates = 
ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
         List<IndexMaintainer> maintainers = 
indexMetaData.getIndexMaintainers();
@@ -818,7 +801,7 @@ public class IndexRegionObserver extends BaseRegionObserver 
{
             byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
             HTableInterfaceReference hTableInterfaceReference =
                     new HTableInterfaceReference(new 
ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
-            List <Pair<Mutation, byte[]>> updates = 
context.indexUpdates.get(hTableInterfaceReference);
+            List<Pair<Mutation, byte[]>> updates = 
context.indexUpdates.get(hTableInterfaceReference);
             for (Pair<Mutation, byte[]> update : updates) {
                 // Are there concurrent updates on the data table row? if so, 
skip post index updates
                 // and let read repair resolve conflicts
@@ -831,13 +814,30 @@ public class IndexRegionObserver extends 
BaseRegionObserver {
                         // Set the status of the index row to "verified"
                         verifiedPut.addColumn(emptyCF, emptyCQ, now, 
VERIFIED_BYTES);
                         context.postIndexUpdates.put(hTableInterfaceReference, 
verifiedPut);
-                    }
-                    else {
+                    } else {
                         context.postIndexUpdates.put(hTableInterfaceReference, 
m);
                     }
+                } else {
+                    if (!hasAllIndexedColumns(indexMaintainer, 
context.multiMutationMap.get(rowKey))) {
+                        // This batch needs to be retried since one of the 
concurrent mutations does not have the value
+                        // for an indexed column. Not including an index 
column may lead to incorrect index row key
+                        // generation for concurrent mutations since 
concurrent mutations are not serialized entirely
+                        // and do not see each other's effect on data table. 
Throwing an IOException will result in
+                        // retries of this batch. Before throwing exception, 
we need to remove reference counts and
+                        // locks for the rows of this batch
+                        removePendingRows(context);
+                        context.indexUpdates.clear();
+                        for (RowLock rowLock : context.rowLocks) {
+                            rowLock.release();
+                        }
+                        context.rowLocks.clear();
+                        throw new IOException("One of the concurrent mutations 
does not have all indexed columns. " +
+                                "The batch needs to be retried " + tableName);
+                    }
                 }
             }
         }
+
         // We are done with handling concurrent mutations. So we can remove 
the rows of this batch from
         // the collection of pending rows
         removePendingRows(context);
@@ -915,7 +915,7 @@ public class IndexRegionObserver extends BaseRegionObserver 
{
         // Prepare current and next data rows states for pending mutations 
(for global indexes)
         prepareDataRowStates(c, miniBatchOp, context, now);
         // Group all the updates for a single row into a single update to be 
processed (for local indexes)
-        Collection<? extends Mutation> mutations = groupMutations(miniBatchOp, 
now);
+        Collection<? extends Mutation> mutations = groupMutations(miniBatchOp, 
context);
         // early exit if it turns out we don't have any edits
         if (mutations == null || mutations.isEmpty()) {
             return;
@@ -943,11 +943,13 @@ public class IndexRegionObserver extends 
BaseRegionObserver {
         }
         context.rowLocks.clear();
         context.rowLocks = rowLocks;
-        preparePostIndexMutations(context, now, indexMetaData);
+        preparePostIndexMutations(context, now, indexMetaData,
+                
c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
         if (failDataTableUpdatesForTesting) {
             throw new DoNotRetryIOException("Simulating the data table write 
failure");
         }
     }
+
   private void 
setBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c, 
BatchMutateContext context) {
       this.batchMutateContext.set(context);
   }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
index 84ccdb1..f6cf813 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
@@ -282,4 +282,31 @@ public class IndexManagementUtil {
           }
           return flattenedMutations;
       }
+
+    public static Collection<? extends Mutation> 
flattenMutationsByType(Collection<? extends Mutation> mutations)
+            throws IOException{
+        List<Mutation> flattenedMutations = 
Lists.newArrayListWithExpectedSize(mutations.size());
+        for (Mutation m : mutations) {
+            Put put = null;
+            Delete del = null;
+            for (List<Cell> cells : m.getFamilyCellMap().values()) {
+                for (Cell cell : cells) {
+                    if (KeyValue.Type.codeToType(cell.getTypeByte()) == 
KeyValue.Type.Put) {
+                        if (put == null) {
+                            put = new Put(cell.getRow());
+                            flattenedMutations.add(put);
+                        }
+                        put.add(cell);
+                    } else {
+                        if (del == null) {
+                            del = new Delete(cell.getRow());
+                            flattenedMutations.add(del);
+                        }
+                        del.addDeleteMarker(cell);
+                    }
+                }
+            }
+        }
+        return flattenedMutations;
+    }
 }
\ No newline at end of file

Reply via email to