[ 
https://issues.apache.org/jira/browse/PHOENIX-6387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17340446#comment-17340446
 ] 

ASF GitHub Bot commented on PHOENIX-6387:
-----------------------------------------

tkhurana commented on a change in pull request #1215:
URL: https://github.com/apache/phoenix/pull/1215#discussion_r627780981



##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
##########
@@ -1165,4 +1356,255 @@ public static void enableIndexing(HTableDescriptor 
desc, Class<? extends IndexBu
       properties.put(IndexRegionObserver.INDEX_BUILDER_CONF_KEY, 
builder.getName());
       desc.addCoprocessor(IndexRegionObserver.class.getName(), null, priority, 
properties);
   }
-}
\ No newline at end of file
+
+    /**
+     * This function has been adapted from 
PhoenixIndexBuilder#executeAtomicOp().
+     * The critical difference being that the code in 
PhoenixIndexBuilder#executeAtomicOp()
+     * generates the mutations by reading the latest data table row from HBase 
but in order
+     * to correctly support concurrent index mutations we need to always read 
the latest
+     * data table row from memory.
+     * It takes in an atomic Put mutation and generates a list of Put and 
Delete mutations.
+     * The list will be empty in the case of ON DUPLICATE KEY IGNORE and the 
row already exists.
+     * In the case of ON DUPLICATE KEY UPDATE, we will generate one Put 
mutation and optionally
+     * one Delete mutation (with DeleteColumn type cells for all columns set 
to null).
+     */
+  private List<Mutation> generateOnDupMutations(BatchMutateContext context, 
Put atomicPut) throws IOException {
+      List<Mutation> mutations = Lists.newArrayListWithExpectedSize(2);
+      byte[] opBytes = atomicPut.getAttribute(ATOMIC_OP_ATTRIB);
+      if (opBytes == null) { // Unexpected
+          return null;
+      }
+      Put put = null;
+      Delete delete = null;
+      // We cannot neither use the time stamp in the Increment to set the Get 
time range

Review comment:
       @gjacoby126 yes, there is no increment anymore. I will update the 
comment.

##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
##########
@@ -1165,4 +1356,255 @@ public static void enableIndexing(HTableDescriptor 
desc, Class<? extends IndexBu
       properties.put(IndexRegionObserver.INDEX_BUILDER_CONF_KEY, 
builder.getName());
       desc.addCoprocessor(IndexRegionObserver.class.getName(), null, priority, 
properties);
   }
-}
\ No newline at end of file
+
+    /**
+     * This function has been adapted from 
PhoenixIndexBuilder#executeAtomicOp().
+     * The critical difference being that the code in 
PhoenixIndexBuilder#executeAtomicOp()
+     * generates the mutations by reading the latest data table row from HBase 
but in order
+     * to correctly support concurrent index mutations we need to always read 
the latest
+     * data table row from memory.
+     * It takes in an atomic Put mutation and generates a list of Put and 
Delete mutations.
+     * The list will be empty in the case of ON DUPLICATE KEY IGNORE and the 
row already exists.
+     * In the case of ON DUPLICATE KEY UPDATE, we will generate one Put 
mutation and optionally
+     * one Delete mutation (with DeleteColumn type cells for all columns set 
to null).
+     */
+  private List<Mutation> generateOnDupMutations(BatchMutateContext context, 
Put atomicPut) throws IOException {
+      List<Mutation> mutations = Lists.newArrayListWithExpectedSize(2);
+      byte[] opBytes = atomicPut.getAttribute(ATOMIC_OP_ATTRIB);
+      if (opBytes == null) { // Unexpected
+          return null;
+      }
+      Put put = null;
+      Delete delete = null;
+      // We cannot neither use the time stamp in the Increment to set the Get 
time range
+      // nor set the Put/Delete time stamp and have this be atomic as HBase 
does not
+      // handle that. Though we disallow using ON DUPLICATE KEY clause when the
+      // CURRENT_SCN is set, we still may have a time stamp set as of when the 
table
+      // was resolved on the client side. We need to ignore this as well due 
to limitations
+      // in HBase, but this isn't too bad as the time will be very close the 
the current
+      // time anyway.
+      long ts = HConstants.LATEST_TIMESTAMP;
+
+      byte[] rowKey = atomicPut.getRow();
+      ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(rowKey);
+      // Get the latest data row state
+      Pair<Put, Put> dataRowState = context.dataRowStates.get(rowKeyPtr);
+      Put currentDataRowState = dataRowState != null ? dataRowState.getFirst() 
: null;
+
+      if (PhoenixIndexBuilder.isDupKeyIgnore(opBytes)) {
+          if (currentDataRowState == null) {
+              // new row
+              mutations.add(atomicPut);
+          }
+          return mutations;
+      }
+
+      ByteArrayInputStream stream = new ByteArrayInputStream(opBytes);
+      DataInputStream input = new DataInputStream(stream);
+      boolean skipFirstOp = input.readBoolean();
+      short repeat = input.readShort();
+      final int[] estimatedSizeHolder = {0};
+      List<Pair<PTable, List<Expression>>> operations = 
Lists.newArrayListWithExpectedSize(3);
+
+      // store the columns that need to be read in the conditional expressions
+      final Set<ColumnReference> colsReadInExpr = new HashSet<>();
+      while (true) {
+          ExpressionVisitor<Void> visitor = new 
StatelessTraverseAllExpressionVisitor<Void>() {
+              @Override

Review comment:
       @gjacoby126 good idea.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Conditional updates on tables with indexes
> ------------------------------------------
>
>                 Key: PHOENIX-6387
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-6387
>             Project: Phoenix
>          Issue Type: Improvement
>    Affects Versions: 5.0.0, 4.15.0
>            Reporter: Kadir OZDEMIR
>            Assignee: Tanuj Khurana
>            Priority: Major
>
> For a row update done by using the UPSERT VALUES statement, the exact values 
> of the columns to be updated are specified within the UPSERT statement. 
> Regardless of whether a given row exists or not, after the update, we know 
> what the content will be for these columns. However, this is not the case 
> when the ON DUPLICATE KEY clause is added the UPSERT VALUES statement. This 
> clause makes the update conditional and the end result is determined based on 
> the conditions stated within the clause and the current state of the row at 
> the time the update is done. Also, this clause makes the UPSERT VALUES 
> statement atomic.
> Conditional updates are supported for the tables without indexes currently. 
> The current design leverages an HBase atomic operation and cannot be expanded 
> to support tables with indexes since the design requires holding (HBase 
> level) row locks while doing index table updates over RPCs. This results in 
> cluster wide deadlocks. This jira is to redesign conditional updates using 
> Phoenix level row locks instead of using HBase level row locks to also 
> support tables with indexes by leveraging the design of PHOENIX-6160 which 
> simplifies the concurrent mutation handling on tables with indexes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to