This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch 5.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.2 by this push:
new 32279f5878 PHOENIX-7612 Fix Cell references in IndexRegionObserver
32279f5878 is described below
commit 32279f58785b0bd5f0e46911fa44799ac99f0f72
Author: tkhurana <[email protected]>
AuthorDate: Fri Jul 25 13:56:42 2025 -0700
PHOENIX-7612 Fix Cell references in IndexRegionObserver
---
.../java/org/apache/phoenix/util/MutationUtil.java | 79 +++++++++++++++++
.../phoenix/hbase/index/IndexRegionObserver.java | 6 +-
.../org/apache/phoenix/util/MutationUtilTest.java | 98 ++++++++++++++++++++++
3 files changed, 182 insertions(+), 1 deletion(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/MutationUtil.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/MutationUtil.java
new file mode 100644
index 0000000000..ce5bc69fd1
--- /dev/null
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/MutationUtil.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Put;
+
+public class MutationUtil {
+
+ private MutationUtil() {
+ }
+
+ /**
+ * Creates a true deep copy of the Put Mutation, including deep copies of
all cells if the cells
+ * are backed by off-heap. The Mutation(Mutation source) constructor always
does a shallow copy of
+ * the cells.
+ * @param original The original Put Mutation to copy
+ * @return A new Put Mutation with deep copies of all fields and Cells
+ */
+ public static Put copyPut(Put original) throws IOException {
+ return copyPut(original, false);
+ }
+
+ /**
+ * Creates a true deep copy of the Put Mutation, including deep copies of
all cells if the cells
+ * are backed by off-heap. The Mutation(Mutation source) constructor always
does a shallow copy of
+ * the cells.
+ * @param original The original Put Mutation to copy
+ * @param skipAttributes If true, the attributes are not copied
+ * @return A new Put Mutation with deep copies of all fields and Cells
+ */
+ public static Put copyPut(Put original, boolean skipAttributes) throws
IOException {
+ // Copies the bytes internally
+ Put copy = new Put(original.getRow());
+
+ // Copy the fields in Mutation class
+ // Copy timestamp
+ copy.setTimestamp(original.getTimestamp());
+ // Copy durability
+ copy.setDurability(original.getDurability());
+
+ // Copy the fields in OperationWithAttributes class
+ // Copy attributes
+ if (!skipAttributes) {
+ for (Map.Entry<String, byte[]> entry :
original.getAttributesMap().entrySet()) {
+ copy.setAttribute(entry.getKey(), entry.getValue().clone());
+ }
+ }
+ // copy priority
+ copy.setPriority(original.getPriority());
+
+ for (List<Cell> cells : original.getFamilyCellMap().values()) {
+ for (Cell cell : cells) {
+ // copy cell if needed
+ copy.add(CellUtil.cloneIfNecessary(cell));
+ }
+ }
+ return copy;
+ }
+}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index aab5fe1e44..e5106eb9b8 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -127,6 +127,7 @@ import org.apache.phoenix.util.ClientUtil;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.MutationUtil;
import org.apache.phoenix.util.PhoenixKeyValueUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerIndexUtil;
@@ -925,7 +926,10 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
}
Put put = lastContext.getNextDataRowState(rowKeyPtr);
if (put != null) {
- context.dataRowStates.put(rowKeyPtr, new Pair<>(put, new
Put(put)));
+ // We have detected a concurrent update so do a deep copy of the
+ // previous update but we can skip the attributes
+ Put copy = MutationUtil.copyPut(put, true);
+ context.dataRowStates.put(rowKeyPtr, new Pair<>(copy, new
Put(copy)));
}
} else {
// The last batch for this row key failed. We cannot use the
memory state.
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/util/MutationUtilTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/util/MutationUtilTest.java
new file mode 100644
index 0000000000..69f549f947
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/MutationUtilTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import org.apache.hadoop.hbase.ByteBufferKeyValue;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.security.visibility.CellVisibility;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.junit.Test;
+
+public class MutationUtilTest {
+
+ @Test
+ public void testPutCopy() throws IOException {
+ final byte[] rowKey = Bytes.toBytes("row1");
+ final byte[] family = Bytes.toBytes("f");
+ long ts = EnvironmentEdgeManager.currentTimeMillis();
+ final int numCols = 5;
+ final int priority = 255;
+ Put source = new Put(rowKey);
+ source.setTimestamp(ts);
+ source.setDurability(Durability.SKIP_WAL);
+ source.setPriority(priority);
+ // add the cells
+ for (int i = 0; i < numCols; i++) {
+ source.add(
+ createOffHeapCell(rowKey, family, Bytes.toBytes("col" + i), ts,
Bytes.toBytes("v_" + i)));
+ }
+ // set some known attributes
+ source.setTTL(345);
+ source.setCellVisibility(new CellVisibility("secret"));
+ // set some custom attributes
+ source.setAttribute(BaseScannerRegionObserverConstants.CLIENT_VERSION,
Bytes.toBytes("5.2.1"));
+ Put copied = MutationUtil.copyPut(source);
+ assertArrayEquals(source.getRow(), copied.getRow());
+ assertEquals(source.getTimestamp(), copied.getTimestamp());
+ assertEquals(source.getDurability(), copied.getDurability());
+ assertEquals(source.getPriority(), copied.getPriority());
+ assertTrue(areEqual(source.getAttributesMap(), copied.getAttributesMap()));
+ NavigableMap<byte[], List<Cell>> sourceFamilyCellMap =
source.getFamilyCellMap();
+ NavigableMap<byte[], List<Cell>> copiedFamilyCellMap =
copied.getFamilyCellMap();
+ assertEquals(sourceFamilyCellMap.size(), copiedFamilyCellMap.size());
+ List<Cell> sourceCells = sourceFamilyCellMap.get(family);
+ List<Cell> copiedCells = copiedFamilyCellMap.get(family);
+ assertEquals(sourceCells.size(), copiedCells.size());
+ sourceCells.stream().allMatch(cell -> cell instanceof ByteBufferKeyValue);
+ copiedCells.stream().allMatch(cell -> cell instanceof KeyValue);
+ copied = MutationUtil.copyPut(source, true);
+ assertTrue(copied.getAttributesMap().isEmpty());
+ }
+
+ private Cell createOffHeapCell(byte[] rowKey, byte[] family, byte[]
qualifier, long ts,
+ byte[] value) {
+ KeyValue kv = new KeyValue(rowKey, family, qualifier, ts,
KeyValue.Type.Put);
+ ByteBuffer dbb = ByteBuffer.allocateDirect(kv.getBuffer().length);
+ ByteBufferUtils.copyFromArrayToBuffer(dbb, kv.getBuffer(), 0,
kv.getBuffer().length);
+ ByteBufferKeyValue offheapKV = new ByteBufferKeyValue(dbb, 0,
kv.getBuffer().length, 0);
+ return offheapKV;
+ }
+
+ private boolean areEqual(Map<String, byte[]> first, Map<String, byte[]>
second) {
+ if (first.size() != second.size()) {
+ return false;
+ }
+ return first.entrySet().stream()
+ .allMatch(e -> Arrays.equals(e.getValue(), second.get(e.getKey())));
+ }
+}