swaroopak commented on a change in pull request #701: PHOENIX-5709 Simplify
index update generation code for consistent glo…
URL: https://github.com/apache/phoenix/pull/701#discussion_r376101927
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
##########
@@ -533,13 +666,139 @@ private void
handleLocalIndexUpdates(ObserverContext<RegionCoprocessorEnvironmen
}
}
- private void prepareIndexMutations(
- ObserverContext<RegionCoprocessorEnvironment> c,
- MiniBatchOperationInProgress<Mutation> miniBatchOp,
- BatchMutateContext context,
- Collection<? extends Mutation> mutations,
- long now,
- PhoenixIndexMetaData indexMetaData) throws Throwable {
+ /**
+ * Retrieve the the last committed row state. This method is called only
for regular updates
+ */
+ private void
getCurrentRowStates(ObserverContext<RegionCoprocessorEnvironment> c,
+ BatchMutateContext context) throws
IOException {
+ Set<KeyRange> keys = new HashSet<KeyRange>(context.rowsToLock.size());
+ for (ImmutableBytesPtr rowKeyPtr : context.rowsToLock) {
+ keys.add(PVarbinary.INSTANCE.getKeyRange(rowKeyPtr.get()));
+ }
+ Scan scan = new Scan();
+ ScanRanges scanRanges = ScanRanges.createPointLookup(new
ArrayList<KeyRange>(keys));
+ scanRanges.initializeScan(scan);
+ SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
+ scan.setFilter(skipScanFilter);
+ context.currentRowStates = new HashMap<ImmutableBytesPtr,
Put>(context.rowsToLock.size());
+ try (RegionScanner scanner =
c.getEnvironment().getRegion().getScanner(scan)) {
+ boolean more = true;
+ while(more) {
+ List<Cell> cells = new ArrayList<Cell>();
+ more = scanner.next(cells);
+ if (cells.isEmpty()) {
+ continue;
+ }
+ byte[] rowKey = CellUtil.cloneRow(cells.get(0));
+ Put put = new Put(rowKey);
+ for (Cell cell : cells) {
+ put.add(cell);
+ }
+ context.currentRowStates.put(new ImmutableBytesPtr(rowKey),
put);
+ }
+ }
+ }
+
+ private Collection<Cell> getCollection(Mutation m) {
+ Collection<Cell> collection = new ArrayList<>();
+ for (List<Cell> cells : m.getFamilyCellMap().values()) {
+ collection.addAll(cells);
+ }
+ return collection;
+ }
+
+ /**
+ * Generate the index updates from the mutations without considering the
previous row state. This method is called
+ * when to rebuild index rows from the existing data table rows. This
method is used only for global indexes
+ */
+ private void prepareIndexMutationsForIndexRebuild(BatchMutateContext
context,
+ List<IndexMaintainer>
maintainers,
+ Collection<? extends
Mutation> pendingMutations) throws IOException {
+ for (IndexMaintainer indexMaintainer : maintainers) {
+ HTableInterfaceReference hTableInterfaceReference =
+ new HTableInterfaceReference(new
ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+ for (Mutation mutation : pendingMutations) {
+ ImmutableBytesPtr rowKeyPtr = new
ImmutableBytesPtr(mutation.getRow());
+ ValueGetter valueGetter = new
IndexRebuildRegionScanner.SimpleValueGetter((Put) mutation);
+ Put indexPut =
indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
valueGetter,
+ rowKeyPtr, getMaxTimestamp(mutation), null, null);
+ if (indexPut == null) {
+ byte[] indexRowKey =
indexMaintainer.buildRowKey(valueGetter, rowKeyPtr,
+ null, null, HConstants.LATEST_TIMESTAMP);
+ indexPut = new Put(indexRowKey);
+ }
+
indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ indexMaintainer.getEmptyKeyValueQualifier(),
getMaxTimestamp(mutation), VERIFIED_BYTES);
+ context.indexUpdates.put(hTableInterfaceReference, new
Pair<>((Mutation)indexPut, mutation.getRow()));
+ }
+ }
+ }
+
+ /**
+ * Generate the index update for a data row from the mutation that are
obtained by merging the previous data row
+ * state with the pending row mutation.
+ */
+ private void
prepareIndexMutations(ObserverContext<RegionCoprocessorEnvironment> c,
+ BatchMutateContext context,
+ List<IndexMaintainer> maintainers,
+ Collection<? extends Mutation>
pendingMutations) throws IOException {
+ for (IndexMaintainer indexMaintainer : maintainers) {
+ HTableInterfaceReference hTableInterfaceReference =
+ new HTableInterfaceReference(new
ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+ byte[] regionStartKey = null;
+ byte[] regionEndKey = null;
+ if (indexMaintainer.isLocalIndex()) {
+ regionStartKey =
c.getEnvironment().getRegion().getRegionInfo().getStartKey();
+ regionEndKey =
c.getEnvironment().getRegion().getRegionInfo().getEndKey();
+ }
+ for (Mutation mutation : pendingMutations) {
+ ImmutableBytesPtr rowKeyPtr = new
ImmutableBytesPtr(mutation.getRow());
+ Put currentRow = context.currentRowStates.get(rowKeyPtr);
+ ValueGetter currentRowVG = (currentRow == null) ? null :
+ new
IndexRebuildRegionScanner.SimpleValueGetter(currentRow);
+ long ts = getTimestamp(mutation);
+ if (mutation instanceof Put) {
+ Put mergedRow = context.mergedRowStates.get(rowKeyPtr);
+ ValueGetter mergedRowVG = new
IndexRebuildRegionScanner.SimpleValueGetter(mergedRow);
+ Put indexPut =
indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
+ mergedRowVG, rowKeyPtr, ts, regionStartKey,
regionEndKey);
+ if (indexPut == null) {
+ // No covered column. Just prepare an index row with
the empty column
+ byte[] indexRowKey =
indexMaintainer.buildRowKey(mergedRowVG, rowKeyPtr,
+ regionStartKey, regionEndKey,
HConstants.LATEST_TIMESTAMP);
+ indexPut = new Put(indexRowKey);
+ }
+
indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
Review comment:
why shouldn't this be `dataEmptyKeyValueCF` from IndexMaintainer? or it
doesn't matter?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services