[ 
https://issues.apache.org/jira/browse/PHOENIX-6387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17338630#comment-17338630
 ] 

ASF GitHub Bot commented on PHOENIX-6387:
-----------------------------------------

gjacoby126 commented on a change in pull request #1215:
URL: https://github.com/apache/phoenix/pull/1215#discussion_r624274587



##########
File path: 
phoenix-core/src/it/java/org/apache/phoenix/end2end/WALAnnotationIT.java
##########
@@ -456,6 +456,42 @@ public void testTenantViewUpsertWithIndex() throws 
Exception {
         tenantViewHelper(true);
     }
 
+    @Test
+    public void testOnDuplicateUpsertWithIndex() throws Exception {
+        if (this.isImmutable) {

Review comment:
       nit: can be Assume.assumeFalse(this.isImmutable)

##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
##########
@@ -1014,17 +1166,47 @@ public void 
preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnviro
         }
     }
 
+    /**
+     * In case of ON DUPLICATE KEY IGNORE, if the row already exists no 
mutations will be
+     * generated so release the row lock.

Review comment:
       nice optimization

##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
##########
@@ -926,57 +1051,84 @@ private void waitForPreviousConcurrentBatch(TableName 
table, BatchMutateContext
                 // lastContext.getMaxPendingRowCount() is the depth of the 
subtree rooted at the batch pointed by lastContext
                 if (!countDownLatch.await((lastContext.getMaxPendingRowCount() 
+ 1) * concurrentMutationWaitDuration,
                         TimeUnit.MILLISECONDS)) {
+                    LOG.debug(String.format("latch timeout context %s last 
%s", context, lastContext));
                     done = false;
-                    break;
                 }
                 // Acquire the locks again before letting the region proceed 
with data table updates
                 lockRows(context);
+                if (!done) {
+                    // previous concurrent batch did not complete so we have 
to retry this batch
+                    break;
+                } else {
+                    // read the phase again to determine the status of 
previous batch
+                    phase = lastContext.getCurrentPhase();
+                    LOG.debug(String.format("context %s last %s exit phase 
%s", context, lastContext, phase));

Review comment:
       what does to the toString() of a batch mutate context produce? If it 
outputs the actual mutations, this can be a problem for environments where logs 
aren't allowed to contain certain kinds of data. 

##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
##########
@@ -476,6 +500,53 @@ private void 
populateRowsToLock(MiniBatchOperationInProgress<Mutation> miniBatch
       }
   }
 
+    /**
+     * Add the mutations generated by the ON DUPLICATE KEY UPDATE to the 
current batch.
+     * MiniBatchOperationInProgress#addOperationsFromCP() allows coprocessors 
to attach additional mutations

Review comment:
       Thanks for the good explanation. 

##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
##########
@@ -1165,4 +1356,255 @@ public static void enableIndexing(HTableDescriptor 
desc, Class<? extends IndexBu
       properties.put(IndexRegionObserver.INDEX_BUILDER_CONF_KEY, 
builder.getName());
       desc.addCoprocessor(IndexRegionObserver.class.getName(), null, priority, 
properties);
   }
-}
\ No newline at end of file
+
+    /**
+     * This function has been adapted from 
PhoenixIndexBuilder#executeAtomicOp().
+     * The critical difference being that the code in 
PhoenixIndexBuilder#executeAtomicOp()
+     * generates the mutations by reading the latest data table row from HBase 
but in order
+     * to correctly support concurrent index mutations we need to always read 
the latest
+     * data table row from memory.
+     * It takes in an atomic Put mutation and generates a list of Put and 
Delete mutations.
+     * The list will be empty in the case of ON DUPLICATE KEY IGNORE and the 
row already exists.
+     * In the case of ON DUPLICATE KEY UPDATE, we will generate one Put 
mutation and optionally

Review comment:
       What if ON DUPLICATE KEY UPDATE does something that would generate a 
Delete mutation (like setting something to NULL)?

##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
##########
@@ -885,8 +1001,7 @@ private void preparePostIndexMutations(BatchMutateContext 
context,
                 }
             }
         }
-        removePendingRows(context);
-        context.indexUpdates.clear();
+        // all cleanup will be done in postBatchMutateIndispensably()

Review comment:
       thanks for the helpful comment

##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
##########
@@ -1138,10 +1329,10 @@ private void 
doPre(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateC
           metricSource.updatePreIndexUpdateFailureTime(dataTableName,
               EnvironmentEdgeManager.currentTimeMillis() - start);
           metricSource.incrementPreIndexUpdateFailures(dataTableName);
-          // Remove all locks as they are already unlocked. There is no need 
to unlock them again later when
-          // postBatchMutateIndispensably() is called
-          removePendingRows(context);
-          context.rowLocks.clear();
+          // Re-acquire all locks since we released them before making index 
updates
+          // Removal of reference counts and locks for the rows of this batch 
will be

Review comment:
       I don't follow why we want to relock rows here rather than later when we 
retry (if we retry)? If this is the last retry and we fail, then when do the 
rows get unlocked?

##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
##########
@@ -520,29 +591,48 @@ private void populatePendingRows(BatchMutateContext 
context) {
                     context.multiMutationMap.put(row, stored);
                 }
                 stored.addAll(m);
+                Mutation[] mutationsAddedByCP = 
miniBatchOp.getOperationsFromCoprocessors(i);

Review comment:
       do we need to check || isAtomic(m) on line 585, or do we only group if 
we actually have an index? 

##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
##########
@@ -1165,4 +1356,255 @@ public static void enableIndexing(HTableDescriptor 
desc, Class<? extends IndexBu
       properties.put(IndexRegionObserver.INDEX_BUILDER_CONF_KEY, 
builder.getName());
       desc.addCoprocessor(IndexRegionObserver.class.getName(), null, priority, 
properties);
   }
-}
\ No newline at end of file
+
+    /**
+     * This function has been adapted from 
PhoenixIndexBuilder#executeAtomicOp().
+     * The critical difference being that the code in 
PhoenixIndexBuilder#executeAtomicOp()
+     * generates the mutations by reading the latest data table row from HBase 
but in order
+     * to correctly support concurrent index mutations we need to always read 
the latest
+     * data table row from memory.
+     * It takes in an atomic Put mutation and generates a list of Put and 
Delete mutations.
+     * The list will be empty in the case of ON DUPLICATE KEY IGNORE and the 
row already exists.
+     * In the case of ON DUPLICATE KEY UPDATE, we will generate one Put 
mutation and optionally
+     * one Delete mutation (with DeleteColumn type cells for all columns set 
to null).
+     */
+  private List<Mutation> generateOnDupMutations(BatchMutateContext context, 
Put atomicPut) throws IOException {
+      List<Mutation> mutations = Lists.newArrayListWithExpectedSize(2);
+      byte[] opBytes = atomicPut.getAttribute(ATOMIC_OP_ATTRIB);
+      if (opBytes == null) { // Unexpected
+          return null;
+      }
+      Put put = null;
+      Delete delete = null;
+      // We cannot neither use the time stamp in the Increment to set the Get 
time range
+      // nor set the Put/Delete time stamp and have this be atomic as HBase 
does not
+      // handle that. Though we disallow using ON DUPLICATE KEY clause when the
+      // CURRENT_SCN is set, we still may have a time stamp set as of when the 
table
+      // was resolved on the client side. We need to ignore this as well due 
to limitations
+      // in HBase, but this isn't too bad as the time will be very close the 
the current
+      // time anyway.
+      long ts = HConstants.LATEST_TIMESTAMP;
+
+      byte[] rowKey = atomicPut.getRow();
+      ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(rowKey);
+      // Get the latest data row state
+      Pair<Put, Put> dataRowState = context.dataRowStates.get(rowKeyPtr);
+      Put currentDataRowState = dataRowState != null ? dataRowState.getFirst() 
: null;
+
+      if (PhoenixIndexBuilder.isDupKeyIgnore(opBytes)) {
+          if (currentDataRowState == null) {
+              // new row
+              mutations.add(atomicPut);
+          }
+          return mutations;
+      }
+
+      ByteArrayInputStream stream = new ByteArrayInputStream(opBytes);
+      DataInputStream input = new DataInputStream(stream);
+      boolean skipFirstOp = input.readBoolean();
+      short repeat = input.readShort();
+      final int[] estimatedSizeHolder = {0};
+      List<Pair<PTable, List<Expression>>> operations = 
Lists.newArrayListWithExpectedSize(3);
+
+      // store the columns that need to be read in the conditional expressions
+      final Set<ColumnReference> colsReadInExpr = new HashSet<>();
+      while (true) {
+          ExpressionVisitor<Void> visitor = new 
StatelessTraverseAllExpressionVisitor<Void>() {
+              @Override

Review comment:
       nit: good place for an Extract Method, since the logic's complex and not 
needed to follow the logic of the larger method. Also good to stick an extra 
comment explaining the column parsing. 

##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
##########
@@ -1165,4 +1356,255 @@ public static void enableIndexing(HTableDescriptor 
desc, Class<? extends IndexBu
       properties.put(IndexRegionObserver.INDEX_BUILDER_CONF_KEY, 
builder.getName());
       desc.addCoprocessor(IndexRegionObserver.class.getName(), null, priority, 
properties);
   }
-}
\ No newline at end of file
+
+    /**
+     * This function has been adapted from 
PhoenixIndexBuilder#executeAtomicOp().
+     * The critical difference being that the code in 
PhoenixIndexBuilder#executeAtomicOp()
+     * generates the mutations by reading the latest data table row from HBase 
but in order
+     * to correctly support concurrent index mutations we need to always read 
the latest
+     * data table row from memory.
+     * It takes in an atomic Put mutation and generates a list of Put and 
Delete mutations.
+     * The list will be empty in the case of ON DUPLICATE KEY IGNORE and the 
row already exists.
+     * In the case of ON DUPLICATE KEY UPDATE, we will generate one Put 
mutation and optionally
+     * one Delete mutation (with DeleteColumn type cells for all columns set 
to null).
+     */
+  private List<Mutation> generateOnDupMutations(BatchMutateContext context, 
Put atomicPut) throws IOException {
+      List<Mutation> mutations = Lists.newArrayListWithExpectedSize(2);
+      byte[] opBytes = atomicPut.getAttribute(ATOMIC_OP_ATTRIB);
+      if (opBytes == null) { // Unexpected
+          return null;
+      }
+      Put put = null;
+      Delete delete = null;
+      // We cannot neither use the time stamp in the Increment to set the Get 
time range

Review comment:
       There's no longer an Increment right?

##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
##########
@@ -1165,4 +1356,255 @@ public static void enableIndexing(HTableDescriptor 
desc, Class<? extends IndexBu
       properties.put(IndexRegionObserver.INDEX_BUILDER_CONF_KEY, 
builder.getName());
       desc.addCoprocessor(IndexRegionObserver.class.getName(), null, priority, 
properties);
   }
-}
\ No newline at end of file
+
+    /**
+     * This function has been adapted from 
PhoenixIndexBuilder#executeAtomicOp().
+     * The critical difference being that the code in 
PhoenixIndexBuilder#executeAtomicOp()
+     * generates the mutations by reading the latest data table row from HBase 
but in order
+     * to correctly support concurrent index mutations we need to always read 
the latest
+     * data table row from memory.
+     * It takes in an atomic Put mutation and generates a list of Put and 
Delete mutations.
+     * The list will be empty in the case of ON DUPLICATE KEY IGNORE and the 
row already exists.
+     * In the case of ON DUPLICATE KEY UPDATE, we will generate one Put 
mutation and optionally

Review comment:
       Looks like testDeleteOnSingleLowerCaseVarcharColumn already tests for 
this, so we're ok?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Conditional updates on tables with indexes
> ------------------------------------------
>
>                 Key: PHOENIX-6387
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-6387
>             Project: Phoenix
>          Issue Type: Improvement
>    Affects Versions: 5.0.0, 4.15.0
>            Reporter: Kadir OZDEMIR
>            Assignee: Tanuj Khurana
>            Priority: Major
>
> For a row update done by using the UPSERT VALUES statement, the exact values 
> of the columns to be updated are specified within the UPSERT statement. 
> Regardless of whether a given row exists or not, after the update, we know 
> what the content will be for these columns. However, this is not the case 
> when the ON DUPLICATE KEY clause is added the UPSERT VALUES statement. This 
> clause makes the update conditional and the end result is determined based on 
> the conditions stated within the clause and the current state of the row at 
> the time the update is done. Also, this clause makes the UPSERT VALUES 
> statement atomic.
> Conditional updates are supported for the tables without indexes currently. 
> The current design leverages an HBase atomic operation and cannot be expanded 
> to support tables with indexes since the design requires holding (HBase 
> level) row locks while doing index table updates over RPCs. This results in 
> cluster wide deadlocks. This jira is to redesign conditional updates using 
> Phoenix level row locks instead of using HBase level row locks to also 
> support tables with indexes by leveraging the design of PHOENIX-6160 which 
> simplifies the concurrent mutation handling on tables with indexes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to