[ https://issues.apache.org/jira/browse/PHOENIX-6387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17338630#comment-17338630 ]
ASF GitHub Bot commented on PHOENIX-6387: ----------------------------------------- gjacoby126 commented on a change in pull request #1215: URL: https://github.com/apache/phoenix/pull/1215#discussion_r624274587 ########## File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/WALAnnotationIT.java ########## @@ -456,6 +456,42 @@ public void testTenantViewUpsertWithIndex() throws Exception { tenantViewHelper(true); } + @Test + public void testOnDuplicateUpsertWithIndex() throws Exception { + if (this.isImmutable) { Review comment: nit: can be Assume.assumeFalse(this.isImmutable) ########## File path: phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java ########## @@ -1014,17 +1166,47 @@ public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnviro } } + /** + * In case of ON DUPLICATE KEY IGNORE, if the row already exists no mutations will be + * generated so release the row lock. Review comment: nice optimization ########## File path: phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java ########## @@ -926,57 +1051,84 @@ private void waitForPreviousConcurrentBatch(TableName table, BatchMutateContext // lastContext.getMaxPendingRowCount() is the depth of the subtree rooted at the batch pointed by lastContext if (!countDownLatch.await((lastContext.getMaxPendingRowCount() + 1) * concurrentMutationWaitDuration, TimeUnit.MILLISECONDS)) { + LOG.debug(String.format("latch timeout context %s last %s", context, lastContext)); done = false; - break; } // Acquire the locks again before letting the region proceed with data table updates lockRows(context); + if (!done) { + // previous concurrent batch did not complete so we have to retry this batch + break; + } else { + // read the phase again to determine the status of previous batch + phase = lastContext.getCurrentPhase(); + LOG.debug(String.format("context %s last %s exit phase %s", context, lastContext, phase)); Review comment: what does to the toString() of a batch mutate context produce? If it outputs the actual mutations, this can be a problem for environments where logs aren't allowed to contain certain kinds of data. ########## File path: phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java ########## @@ -476,6 +500,53 @@ private void populateRowsToLock(MiniBatchOperationInProgress<Mutation> miniBatch } } + /** + * Add the mutations generated by the ON DUPLICATE KEY UPDATE to the current batch. + * MiniBatchOperationInProgress#addOperationsFromCP() allows coprocessors to attach additional mutations Review comment: Thanks for the good explanation. ########## File path: phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java ########## @@ -1165,4 +1356,255 @@ public static void enableIndexing(HTableDescriptor desc, Class<? extends IndexBu properties.put(IndexRegionObserver.INDEX_BUILDER_CONF_KEY, builder.getName()); desc.addCoprocessor(IndexRegionObserver.class.getName(), null, priority, properties); } -} \ No newline at end of file + + /** + * This function has been adapted from PhoenixIndexBuilder#executeAtomicOp(). + * The critical difference being that the code in PhoenixIndexBuilder#executeAtomicOp() + * generates the mutations by reading the latest data table row from HBase but in order + * to correctly support concurrent index mutations we need to always read the latest + * data table row from memory. + * It takes in an atomic Put mutation and generates a list of Put and Delete mutations. + * The list will be empty in the case of ON DUPLICATE KEY IGNORE and the row already exists. + * In the case of ON DUPLICATE KEY UPDATE, we will generate one Put mutation and optionally Review comment: What if ON DUPLICATE KEY UPDATE does something that would generate a Delete mutation (like setting something to NULL)? ########## File path: phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java ########## @@ -885,8 +1001,7 @@ private void preparePostIndexMutations(BatchMutateContext context, } } } - removePendingRows(context); - context.indexUpdates.clear(); + // all cleanup will be done in postBatchMutateIndispensably() Review comment: thanks for the helpful comment ########## File path: phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java ########## @@ -1138,10 +1329,10 @@ private void doPre(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateC metricSource.updatePreIndexUpdateFailureTime(dataTableName, EnvironmentEdgeManager.currentTimeMillis() - start); metricSource.incrementPreIndexUpdateFailures(dataTableName); - // Remove all locks as they are already unlocked. There is no need to unlock them again later when - // postBatchMutateIndispensably() is called - removePendingRows(context); - context.rowLocks.clear(); + // Re-acquire all locks since we released them before making index updates + // Removal of reference counts and locks for the rows of this batch will be Review comment: I don't follow why we want to relock rows here rather than later when we retry (if we retry)? If this is the last retry and we fail, then when do the rows get unlocked? ########## File path: phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java ########## @@ -520,29 +591,48 @@ private void populatePendingRows(BatchMutateContext context) { context.multiMutationMap.put(row, stored); } stored.addAll(m); + Mutation[] mutationsAddedByCP = miniBatchOp.getOperationsFromCoprocessors(i); Review comment: do we need to check || isAtomic(m) on line 585, or do we only group if we actually have an index? ########## File path: phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java ########## @@ -1165,4 +1356,255 @@ public static void enableIndexing(HTableDescriptor desc, Class<? extends IndexBu properties.put(IndexRegionObserver.INDEX_BUILDER_CONF_KEY, builder.getName()); desc.addCoprocessor(IndexRegionObserver.class.getName(), null, priority, properties); } -} \ No newline at end of file + + /** + * This function has been adapted from PhoenixIndexBuilder#executeAtomicOp(). + * The critical difference being that the code in PhoenixIndexBuilder#executeAtomicOp() + * generates the mutations by reading the latest data table row from HBase but in order + * to correctly support concurrent index mutations we need to always read the latest + * data table row from memory. + * It takes in an atomic Put mutation and generates a list of Put and Delete mutations. + * The list will be empty in the case of ON DUPLICATE KEY IGNORE and the row already exists. + * In the case of ON DUPLICATE KEY UPDATE, we will generate one Put mutation and optionally + * one Delete mutation (with DeleteColumn type cells for all columns set to null). + */ + private List<Mutation> generateOnDupMutations(BatchMutateContext context, Put atomicPut) throws IOException { + List<Mutation> mutations = Lists.newArrayListWithExpectedSize(2); + byte[] opBytes = atomicPut.getAttribute(ATOMIC_OP_ATTRIB); + if (opBytes == null) { // Unexpected + return null; + } + Put put = null; + Delete delete = null; + // We cannot neither use the time stamp in the Increment to set the Get time range + // nor set the Put/Delete time stamp and have this be atomic as HBase does not + // handle that. Though we disallow using ON DUPLICATE KEY clause when the + // CURRENT_SCN is set, we still may have a time stamp set as of when the table + // was resolved on the client side. We need to ignore this as well due to limitations + // in HBase, but this isn't too bad as the time will be very close the the current + // time anyway. + long ts = HConstants.LATEST_TIMESTAMP; + + byte[] rowKey = atomicPut.getRow(); + ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(rowKey); + // Get the latest data row state + Pair<Put, Put> dataRowState = context.dataRowStates.get(rowKeyPtr); + Put currentDataRowState = dataRowState != null ? dataRowState.getFirst() : null; + + if (PhoenixIndexBuilder.isDupKeyIgnore(opBytes)) { + if (currentDataRowState == null) { + // new row + mutations.add(atomicPut); + } + return mutations; + } + + ByteArrayInputStream stream = new ByteArrayInputStream(opBytes); + DataInputStream input = new DataInputStream(stream); + boolean skipFirstOp = input.readBoolean(); + short repeat = input.readShort(); + final int[] estimatedSizeHolder = {0}; + List<Pair<PTable, List<Expression>>> operations = Lists.newArrayListWithExpectedSize(3); + + // store the columns that need to be read in the conditional expressions + final Set<ColumnReference> colsReadInExpr = new HashSet<>(); + while (true) { + ExpressionVisitor<Void> visitor = new StatelessTraverseAllExpressionVisitor<Void>() { + @Override Review comment: nit: good place for an Extract Method, since the logic's complex and not needed to follow the logic of the larger method. Also good to stick an extra comment explaining the column parsing. ########## File path: phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java ########## @@ -1165,4 +1356,255 @@ public static void enableIndexing(HTableDescriptor desc, Class<? extends IndexBu properties.put(IndexRegionObserver.INDEX_BUILDER_CONF_KEY, builder.getName()); desc.addCoprocessor(IndexRegionObserver.class.getName(), null, priority, properties); } -} \ No newline at end of file + + /** + * This function has been adapted from PhoenixIndexBuilder#executeAtomicOp(). + * The critical difference being that the code in PhoenixIndexBuilder#executeAtomicOp() + * generates the mutations by reading the latest data table row from HBase but in order + * to correctly support concurrent index mutations we need to always read the latest + * data table row from memory. + * It takes in an atomic Put mutation and generates a list of Put and Delete mutations. + * The list will be empty in the case of ON DUPLICATE KEY IGNORE and the row already exists. + * In the case of ON DUPLICATE KEY UPDATE, we will generate one Put mutation and optionally + * one Delete mutation (with DeleteColumn type cells for all columns set to null). + */ + private List<Mutation> generateOnDupMutations(BatchMutateContext context, Put atomicPut) throws IOException { + List<Mutation> mutations = Lists.newArrayListWithExpectedSize(2); + byte[] opBytes = atomicPut.getAttribute(ATOMIC_OP_ATTRIB); + if (opBytes == null) { // Unexpected + return null; + } + Put put = null; + Delete delete = null; + // We cannot neither use the time stamp in the Increment to set the Get time range Review comment: There's no longer an Increment right? ########## File path: phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java ########## @@ -1165,4 +1356,255 @@ public static void enableIndexing(HTableDescriptor desc, Class<? extends IndexBu properties.put(IndexRegionObserver.INDEX_BUILDER_CONF_KEY, builder.getName()); desc.addCoprocessor(IndexRegionObserver.class.getName(), null, priority, properties); } -} \ No newline at end of file + + /** + * This function has been adapted from PhoenixIndexBuilder#executeAtomicOp(). + * The critical difference being that the code in PhoenixIndexBuilder#executeAtomicOp() + * generates the mutations by reading the latest data table row from HBase but in order + * to correctly support concurrent index mutations we need to always read the latest + * data table row from memory. + * It takes in an atomic Put mutation and generates a list of Put and Delete mutations. + * The list will be empty in the case of ON DUPLICATE KEY IGNORE and the row already exists. + * In the case of ON DUPLICATE KEY UPDATE, we will generate one Put mutation and optionally Review comment: Looks like testDeleteOnSingleLowerCaseVarcharColumn already tests for this, so we're ok? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Conditional updates on tables with indexes > ------------------------------------------ > > Key: PHOENIX-6387 > URL: https://issues.apache.org/jira/browse/PHOENIX-6387 > Project: Phoenix > Issue Type: Improvement > Affects Versions: 5.0.0, 4.15.0 > Reporter: Kadir OZDEMIR > Assignee: Tanuj Khurana > Priority: Major > > For a row update done by using the UPSERT VALUES statement, the exact values > of the columns to be updated are specified within the UPSERT statement. > Regardless of whether a given row exists or not, after the update, we know > what the content will be for these columns. However, this is not the case > when the ON DUPLICATE KEY clause is added the UPSERT VALUES statement. This > clause makes the update conditional and the end result is determined based on > the conditions stated within the clause and the current state of the row at > the time the update is done. Also, this clause makes the UPSERT VALUES > statement atomic. > Conditional updates are supported for the tables without indexes currently. > The current design leverages an HBase atomic operation and cannot be expanded > to support tables with indexes since the design requires holding (HBase > level) row locks while doing index table updates over RPCs. This results in > cluster wide deadlocks. This jira is to redesign conditional updates using > Phoenix level row locks instead of using HBase level row locks to also > support tables with indexes by leveraging the design of PHOENIX-6160 which > simplifies the concurrent mutation handling on tables with indexes. -- This message was sent by Atlassian Jira (v8.3.4#803005)