This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 849841aa04 PHOENIX-7473 Eliminating index maintenance for CDC index
(#2030)
849841aa04 is described below
commit 849841aa04d82bc3e06f770a492f9c94df528ff6
Author: Kadir Ozdemir <[email protected]>
AuthorDate: Thu Nov 28 00:45:12 2024 -0800
PHOENIX-7473 Eliminating index maintenance for CDC index (#2030)
---
.../coprocessor/CDCGlobalIndexRegionScanner.java | 24 +++-----
.../coprocessor/UncoveredIndexRegionScanner.java | 14 +++--
.../phoenix/hbase/index/IndexRegionObserver.java | 64 ++++++++++++++--------
3 files changed, 57 insertions(+), 45 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
index 4bde0727e9..4cbb4d6147 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
@@ -99,20 +99,12 @@ public class CDCGlobalIndexRegionScanner extends
UncoveredGlobalIndexRegionScann
protected boolean getNextCoveredIndexRow(List<Cell> result) throws
IOException {
if (indexRowIterator.hasNext()) {
List<Cell> indexRow = indexRowIterator.next();
- // firstCell: Picking the earliest cell in the index row so that
- // timestamp of the cell and the row will be same.
- Cell firstIndexCell = indexRow.get(indexRow.size() - 1);
- byte[] indexRowKey =
ImmutableBytesPtr.cloneCellRowIfNecessary(firstIndexCell);
+ Cell indexCell = indexRow.get(0);
+ byte[] indexRowKey =
ImmutableBytesPtr.cloneCellRowIfNecessary(indexCell);
ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
indexToDataRowKeyMap.get(indexRowKey));
Result dataRow = dataRows.get(dataRowKey);
- int phoenixRowTimestampFunctionOffset = 2 +
(indexMaintainer.isMultiTenant() ? 1 : 0)
- + (indexMaintainer.getViewIndexId() != null ? 1 : 0);
- ImmutableBytesPtr ptr = new ImmutableBytesPtr();
-
indexMaintainer.getIndexRowKeySchema().iterator(firstIndexCell.getRowArray(),
- firstIndexCell.getRowOffset(),
firstIndexCell.getRowLength(), ptr,
- phoenixRowTimestampFunctionOffset);
- long changeTS = PLong.INSTANCE.getCodec().decodeLong(ptr,
SortOrder.ASC);
+ long changeTS = indexCell.getTimestamp();
TupleProjector dataTableProjector =
cdcDataTableInfo.getDataTableProjector();
Expression[] expressions = dataTableProjector != null ?
dataTableProjector.getExpressions() : null;
@@ -198,9 +190,9 @@ public class CDCGlobalIndexRegionScanner extends
UncoveredGlobalIndexRegionScann
}
}
if (changeBuilder.isNonEmptyEvent()) {
- Result cdcRow = getCDCImage(indexRowKey,
firstIndexCell);
+ Result cdcRow = getCDCImage(indexRowKey, indexCell);
if (cdcRow != null && tupleProjector != null) {
- if (firstIndexCell.getType() ==
Cell.Type.DeleteFamily) {
+ if (indexCell.getType() == Cell.Type.DeleteFamily)
{
// result is of type
EncodedColumnQualiferCellsList for queries with
// Order by clause. It fails when Delete
Family cell is added to it
// as it expects column qualifier bytes which
is not available.
@@ -208,13 +200,13 @@ public class CDCGlobalIndexRegionScanner extends
UncoveredGlobalIndexRegionScann
result.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
.setRow(indexRowKey)
.setFamily(ImmutableBytesPtr.cloneCellFamilyIfNecessary(
- firstIndexCell))
+ indexCell))
.setQualifier(indexMaintainer.getEmptyKeyValueQualifier())
-
.setTimestamp(firstIndexCell.getTimestamp())
+ .setTimestamp(indexCell.getTimestamp())
.setType(Cell.Type.Put)
.setValue(EMPTY_BYTE_ARRAY).build());
} else {
- result.add(firstIndexCell);
+ result.add(indexCell);
}
IndexUtil.addTupleAsOneCell(result, new
ResultTuple(cdcRow),
tupleProjector, ptr);
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
index aa0ac08c43..70977a545b 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
@@ -301,10 +301,13 @@ public abstract class UncoveredIndexRegionScanner extends
BaseRegionScanner {
put.add(cell);
}
if (indexMaintainer.isCDCIndex()) {
- // A CDC index row key is PARTITION_ID() + PHOENIX_ROW_TIMESTAMP()
+ data row key. The
- // only necessary check is the row timestamp check since the data
row key is extracted
- // from the index row key and PARTITION_ID() changes during region
splits and merges
- if (IndexUtil.getMaxTimestamp(put) == indexTimestamp) {
+ // A CDC index row key is [view index id] + [tenant id] +
PARTITION_ID()
+ // + PHOENIX_ROW_TIMESTAMP() + data row key. The only necessary
check is the row
+ // timestamp check since the data row key is extracted from the
index row key and
+ // PARTITION_ID() changes during region splits and merges so we
cannot check it.
+ // If the scan is a raw scan which is expected to be the case for
CDC scans,
+ // even the time check is not necessary
+ if (scan.isRaw() || IndexUtil.getMaxTimestamp(put) ==
indexTimestamp) {
return true;
}
} else if (indexMaintainer.checkIndexRow(indexRowKey, put)) {
@@ -324,7 +327,8 @@ public abstract class UncoveredIndexRegionScanner extends
BaseRegionScanner {
return true;
}
// This is not a valid index row
- if (indexMaintainer.isAgedEnough(IndexUtil.getMaxTimestamp(put),
ageThreshold)) {
+ if (indexMaintainer.isAgedEnough(IndexUtil.getMaxTimestamp(put),
ageThreshold)
+ && !indexMaintainer.isCDCIndex()) {
region.delete(indexMaintainer.createDelete(indexRowKey,
IndexUtil.getMaxTimestamp(put),
false));
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 8a144f1245..1223c3a980 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -40,8 +40,10 @@ import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.phoenix.coprocessor.generated.PTableProtos;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.expression.CaseExpression;
import org.apache.phoenix.index.PhoenixIndexBuilderHelper;
@@ -85,7 +87,6 @@ import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment;
-import org.apache.phoenix.coprocessor.generated.PTableProtos;
import org.apache.phoenix.exception.DataExceedsCapacityException;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.ExpressionType;
@@ -270,9 +271,9 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
// passed MiniBatchOperationInProgress, like preWALAppend()
private List<Mutation> originalMutations;
private boolean hasAtomic;
- private boolean hasDelete;
- private boolean hasUncoveredIndex;
- private boolean hasGlobalIndex;
+ private boolean hasRowDelete;
+ private boolean hasUncoveredIndex; // Has uncovered global indexes which
are not CDC Indexes
+ private boolean hasGlobalIndex; // Covered global index
private boolean hasLocalIndex;
private boolean hasTransform;
private boolean returnResult;
@@ -978,8 +979,8 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
}
/**
- * Generate the index update for a data row from the mutation that are
obtained by merging the previous data row
- * state with the pending row mutation.
+ * Generate the index update for a data row from the mutation that are
obtained by merging the
+ * previous data row state with the pending row mutation.
*/
private void prepareIndexMutations(BatchMutateContext context,
List<IndexMaintainer> maintainers, long ts)
throws IOException {
@@ -1023,13 +1024,16 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
QueryConstants.UNVERIFIED_BYTES);
context.indexUpdates.put(hTableInterfaceReference,
new Pair<Mutation, byte[]>(indexPut,
rowKeyPtr.get()));
- // Delete the current index row if the new index key is
different than the current one
+ // Delete the current index row if the new index key is
different from the
+ // current one and the index is not a CDC index
if (currentDataRowState != null) {
ValueGetter currentDataRowVG = new
IndexUtil.SimpleValueGetter(currentDataRowState);
byte[] indexRowKeyForCurrentDataRow = indexMaintainer
.buildRowKey(currentDataRowVG, rowKeyPtr,
null, null,
ts, encodedRegionName);
- if (Bytes.compareTo(indexPut.getRow(),
indexRowKeyForCurrentDataRow) != 0) {
+ if (!indexMaintainer.isCDCIndex()
+ && Bytes.compareTo(indexPut.getRow(),
indexRowKeyForCurrentDataRow)
+ != 0) {
Mutation del =
indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
IndexMaintainer.DeleteType.ALL_VERSIONS,
ts);
context.indexUpdates.put(hTableInterfaceReference,
@@ -1038,14 +1042,10 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
}
} else if (currentDataRowState != null
&&
indexMaintainer.shouldPrepareIndexMutations(currentDataRowState)) {
- context.indexUpdates.put(hTableInterfaceReference,
- new Pair<Mutation,
byte[]>(getDeleteIndexMutation(currentDataRowState,
- indexMaintainer, ts, rowKeyPtr,
encodedRegionName),
- rowKeyPtr.get()));
if (indexMaintainer.isCDCIndex()) {
- // CDC Index needs two delete markers one for deleting
the index row, and
- // the other for referencing the data table delete
mutation with the
- // right index row key, that is, the index row key
starting with ts
+ // CDC Index needs two a delete marker for
referencing the data table
+ // delete mutation with the right index row key, that
is, the index row key
+ // starting with ts
Put cdcDataRowState = new
Put(currentDataRowState.getRow());
cdcDataRowState.addColumn(indexMaintainer.getDataEmptyKeyValueCF(),
indexMaintainer.getEmptyKeyValueQualifierForDataTable(), ts,
@@ -1054,6 +1054,11 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
new Pair<Mutation,
byte[]>(getDeleteIndexMutation(cdcDataRowState,
indexMaintainer, ts, rowKeyPtr,
encodedRegionName),
rowKeyPtr.get()));
+ } else {
+ context.indexUpdates.put(hTableInterfaceReference,
+ new Pair<Mutation,
byte[]>(getDeleteIndexMutation(currentDataRowState,
+ indexMaintainer, ts, rowKeyPtr,
encodedRegionName),
+ rowKeyPtr.get()));
}
}
}
@@ -1061,10 +1066,11 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
}
/**
- * This method prepares unverified index mutations which are applied to
index tables before the data table is
- * updated. In the three-phase update approach, in phase 1, the status of
existing index rows is set to "unverified"
- * (these rows will be deleted from the index table in phase 3), and/or
new put mutations are added with the
- * unverified status. In phase 2, data table mutations are applied. In
phase 3, the status for an index table row is
+ * This method prepares unverified index mutations which are applied to
index tables before
+ * the data table is updated. In the three-phase update approach, in phase
1, the status of
+ * existing index rows is set to "unverified" these rows will be deleted
from the index table
+ * in phase 3), and/or new put mutations are added with the unverified
status. In phase 2,
+ * data table mutations are applied. In phase 3, the status for an index
table row is
* either set to "verified" or the row is deleted.
*/
private void preparePreIndexMutations(BatchMutateContext context,
@@ -1167,7 +1173,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
}
private void identifyMutationTypes(MiniBatchOperationInProgress<Mutation>
miniBatchOp,
- BatchMutateContext context) {
+ BatchMutateContext context)
throws IOException {
for (int i = 0; i < miniBatchOp.size(); i++) {
Mutation m = miniBatchOp.getOperation(i);
if (this.builder.returnResult(m) && miniBatchOp.size() == 1) {
@@ -1175,11 +1181,21 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
}
if (this.builder.isAtomicOp(m) || this.builder.returnResult(m)) {
context.hasAtomic = true;
- if (context.hasDelete) {
+ if (context.hasRowDelete) {
return;
}
} else if (m instanceof Delete) {
- context.hasDelete = true;
+ CellScanner scanner = m.cellScanner();
+ if (m.isEmpty()) {
+ context.hasRowDelete = true;
+ } else {
+ while (scanner.advance()) {
+ if (scanner.current().getType() ==
Cell.Type.DeleteFamily) {
+ context.hasRowDelete = true;
+ break;
+ }
+ }
+ }
}
if (context.hasAtomic || context.returnResult) {
return;
@@ -1296,7 +1312,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
identifyMutationTypes(miniBatchOp, context);
context.populateOriginalMutations(miniBatchOp);
- if (context.hasDelete) {
+ if (context.hasRowDelete) {
// Need to add cell tags to Delete Marker before we do any index
processing
// since we add tags to tables which doesn't have indexes also.
ServerIndexUtil.setDeleteAttributes(miniBatchOp);
@@ -1319,7 +1335,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
long start = EnvironmentEdgeManager.currentTimeMillis();
context.dataRowStates = new HashMap<ImmutableBytesPtr, Pair<Put,
Put>>(context.rowsToLock.size());
if (context.hasGlobalIndex || context.hasTransform ||
context.hasAtomic ||
- context.returnResult || context.hasDelete ||
(context.hasUncoveredIndex &&
+ context.returnResult || context.hasRowDelete ||
(context.hasUncoveredIndex &&
isPartialUncoveredIndexMutation(indexMetaData,
miniBatchOp))) {
getCurrentRowStates(c, context);
}