This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch 5.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/5.2 by this push:
     new 32279f5878 PHOENIX-7612 Fix Cell references in IndexRegionObserver
32279f5878 is described below

commit 32279f58785b0bd5f0e46911fa44799ac99f0f72
Author: tkhurana <[email protected]>
AuthorDate: Fri Jul 25 13:56:42 2025 -0700

    PHOENIX-7612 Fix Cell references in IndexRegionObserver
---
 .../java/org/apache/phoenix/util/MutationUtil.java | 79 +++++++++++++++++
 .../phoenix/hbase/index/IndexRegionObserver.java   |  6 +-
 .../org/apache/phoenix/util/MutationUtilTest.java  | 98 ++++++++++++++++++++++
 3 files changed, 182 insertions(+), 1 deletion(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/MutationUtil.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/MutationUtil.java
new file mode 100644
index 0000000000..ce5bc69fd1
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/MutationUtil.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Put;
+
+public class MutationUtil {
+
+  private MutationUtil() {
+  }
+
+  /**
+   * Creates a true deep copy of the Put Mutation, including deep copies of 
all cells if the cells
+   * are backed by off-heap. The Mutation(Mutation source) constructor always 
does a shallow copy of
+   * the cells.
+   * @param original The original Put Mutation to copy
+   * @return A new Put Mutation with deep copies of all fields and Cells
+   */
+  public static Put copyPut(Put original) throws IOException {
+    return copyPut(original, false);
+  }
+
+  /**
+   * Creates a true deep copy of the Put Mutation, including deep copies of 
all cells if the cells
+   * are backed by off-heap. The Mutation(Mutation source) constructor always 
does a shallow copy of
+   * the cells.
+   * @param original       The original Put Mutation to copy
+   * @param skipAttributes If true, the attributes are not copied
+   * @return A new Put Mutation with deep copies of all fields and Cells
+   */
+  public static Put copyPut(Put original, boolean skipAttributes) throws 
IOException {
+    // Copies the bytes internally
+    Put copy = new Put(original.getRow());
+
+    // Copy the fields in Mutation class
+    // Copy timestamp
+    copy.setTimestamp(original.getTimestamp());
+    // Copy durability
+    copy.setDurability(original.getDurability());
+
+    // Copy the fields in OperationWithAttributes class
+    // Copy attributes
+    if (!skipAttributes) {
+      for (Map.Entry<String, byte[]> entry : 
original.getAttributesMap().entrySet()) {
+        copy.setAttribute(entry.getKey(), entry.getValue().clone());
+      }
+    }
+    // copy priority
+    copy.setPriority(original.getPriority());
+
+    for (List<Cell> cells : original.getFamilyCellMap().values()) {
+      for (Cell cell : cells) {
+        // copy cell if needed
+        copy.add(CellUtil.cloneIfNecessary(cell));
+      }
+    }
+    return copy;
+  }
+}
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index aab5fe1e44..e5106eb9b8 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -127,6 +127,7 @@ import org.apache.phoenix.util.ClientUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.MutationUtil;
 import org.apache.phoenix.util.PhoenixKeyValueUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerIndexUtil;
@@ -925,7 +926,10 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
             }
             Put put = lastContext.getNextDataRowState(rowKeyPtr);
             if (put != null) {
-              context.dataRowStates.put(rowKeyPtr, new Pair<>(put, new 
Put(put)));
+              // We have detected a concurrent update so do a deep copy of the
+              // previous update but we can skip the attributes
+              Put copy = MutationUtil.copyPut(put, true);
+              context.dataRowStates.put(rowKeyPtr, new Pair<>(copy, new 
Put(copy)));
             }
           } else {
             // The last batch for this row key failed. We cannot use the 
memory state.
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/util/MutationUtilTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/MutationUtilTest.java
new file mode 100644
index 0000000000..69f549f947
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/MutationUtilTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import org.apache.hadoop.hbase.ByteBufferKeyValue;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.security.visibility.CellVisibility;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.junit.Test;
+
+public class MutationUtilTest {
+
+  @Test
+  public void testPutCopy() throws IOException {
+    final byte[] rowKey = Bytes.toBytes("row1");
+    final byte[] family = Bytes.toBytes("f");
+    long ts = EnvironmentEdgeManager.currentTimeMillis();
+    final int numCols = 5;
+    final int priority = 255;
+    Put source = new Put(rowKey);
+    source.setTimestamp(ts);
+    source.setDurability(Durability.SKIP_WAL);
+    source.setPriority(priority);
+    // add the cells
+    for (int i = 0; i < numCols; i++) {
+      source.add(
+        createOffHeapCell(rowKey, family, Bytes.toBytes("col" + i), ts, 
Bytes.toBytes("v_" + i)));
+    }
+    // set some known attributes
+    source.setTTL(345);
+    source.setCellVisibility(new CellVisibility("secret"));
+    // set some custom attributes
+    source.setAttribute(BaseScannerRegionObserverConstants.CLIENT_VERSION, 
Bytes.toBytes("5.2.1"));
+    Put copied = MutationUtil.copyPut(source);
+    assertArrayEquals(source.getRow(), copied.getRow());
+    assertEquals(source.getTimestamp(), copied.getTimestamp());
+    assertEquals(source.getDurability(), copied.getDurability());
+    assertEquals(source.getPriority(), copied.getPriority());
+    assertTrue(areEqual(source.getAttributesMap(), copied.getAttributesMap()));
+    NavigableMap<byte[], List<Cell>> sourceFamilyCellMap = 
source.getFamilyCellMap();
+    NavigableMap<byte[], List<Cell>> copiedFamilyCellMap = 
copied.getFamilyCellMap();
+    assertEquals(sourceFamilyCellMap.size(), copiedFamilyCellMap.size());
+    List<Cell> sourceCells = sourceFamilyCellMap.get(family);
+    List<Cell> copiedCells = copiedFamilyCellMap.get(family);
+    assertEquals(sourceCells.size(), copiedCells.size());
+    sourceCells.stream().allMatch(cell -> cell instanceof ByteBufferKeyValue);
+    copiedCells.stream().allMatch(cell -> cell instanceof KeyValue);
+    copied = MutationUtil.copyPut(source, true);
+    assertTrue(copied.getAttributesMap().isEmpty());
+  }
+
+  private Cell createOffHeapCell(byte[] rowKey, byte[] family, byte[] 
qualifier, long ts,
+    byte[] value) {
+    KeyValue kv = new KeyValue(rowKey, family, qualifier, ts, 
KeyValue.Type.Put);
+    ByteBuffer dbb = ByteBuffer.allocateDirect(kv.getBuffer().length);
+    ByteBufferUtils.copyFromArrayToBuffer(dbb, kv.getBuffer(), 0, 
kv.getBuffer().length);
+    ByteBufferKeyValue offheapKV = new ByteBufferKeyValue(dbb, 0, 
kv.getBuffer().length, 0);
+    return offheapKV;
+  }
+
+  private boolean areEqual(Map<String, byte[]> first, Map<String, byte[]> 
second) {
+    if (first.size() != second.size()) {
+      return false;
+    }
+    return first.entrySet().stream()
+      .allMatch(e -> Arrays.equals(e.getValue(), second.get(e.getKey())));
+  }
+}

Reply via email to