gokceni commented on a change in pull request #469: PHOENIX-5156 Consistent
Global Indexes for Non-Transactional Tables
URL: https://github.com/apache/phoenix/pull/469#discussion_r280630543
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
##########
@@ -506,30 +623,130 @@ public void
preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnviro
current.addTimelineAnnotation("Built index updates, doing preStep");
TracingUtils.addAnnotation(current, "index update count",
indexUpdates.size());
byte[] tableName =
c.getEnvironment().getRegion().getTableDescriptor().getTableName().getName();
- Iterator<Pair<Mutation, byte[]>> indexUpdatesItr =
indexUpdates.iterator();
+ Iterator<Pair<Pair<Mutation, byte[]>, byte[]>> indexUpdatesItr =
indexUpdates.iterator();
List<Mutation> localUpdates = new
ArrayList<Mutation>(indexUpdates.size());
+ postIndexUpdates = new ArrayList<>(indexUpdates.size());
+ indexUpdatesForDeletes = new ArrayList<>(indexUpdates.size());
while(indexUpdatesItr.hasNext()) {
- Pair<Mutation, byte[]> next = indexUpdatesItr.next();
- if (Bytes.compareTo(next.getSecond(), tableName) == 0) {
- localUpdates.add(next.getFirst());
+ Pair<Pair<Mutation, byte[]>, byte[]> next =
indexUpdatesItr.next();
+ if (Bytes.compareTo(next.getFirst().getSecond(), tableName) ==
0) {
+ localUpdates.add(next.getFirst().getFirst());
indexUpdatesItr.remove();
}
+ else {
+ // get index maintainer for this index table
+ IndexMaintainer indexMaintainer =
getIndexMaintainer(maintainers, next.getFirst().getSecond());
+ if (indexMaintainer == null) {
+ throw new DoNotRetryIOException(
+ "preBatchMutateWithExceptions: indexMaintainer
is null " +
+
c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
+ }
+ byte[] emptyCF =
indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
+ byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+ // add the VERIFIED cell, which is the empty cell
+ Mutation m = next.getFirst().getFirst();
+ boolean rebuild =
PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap());
+ long ts = getMaxTimestamp(m);
+ if (rebuild) {
+ if (m instanceof Put) {
+ ((Put)m).addColumn(emptyCF, emptyCQ, ts, TRUE_BYTES);
+ }
+ } else {
+ if (m instanceof Put) {
+ ((Put)m).addColumn(emptyCF, emptyCQ, ts,
FALSE_BYTES);
+ // Ignore post index updates (i.e., the third write
phase updates) for this row if it is
+ // going through concurrent updates
+ RowKey rowKey = new RowKey(next.getSecond());
+ if (!context.pendingRows.contains(rowKey)) {
+ Put put = new Put(m.getRow());
+ put.addColumn(emptyCF, emptyCQ, ts, TRUE_BYTES);
+ postIndexUpdates.add(new Pair<>(new Pair<>(put,
next.getFirst().getSecond()), next.getSecond()));
+ }
+ } else {
+ // For a delete mutation, first unverify the exiting
row in the index table and then delete
+ // the row from the index table after deleting the
corresponding row from the data table
+ indexUpdatesItr.remove();
+ Put put = new Put(m.getRow());
+ put.addColumn(emptyCF, emptyCQ, ts, FALSE_BYTES);
+ indexUpdatesForDeletes.add(new Pair<>(put,
next.getFirst().getSecond()));
+ // Ignore post index updates (i.e., the third write
phase updates) for this row if it is
+ // going through concurrent updates
+ RowKey rowKey = new RowKey(next.getSecond());
+ if (!context.pendingRows.contains(rowKey)) {
+ postIndexUpdates.add(next);
+ }
+ }
+ }
+ }
}
if (!localUpdates.isEmpty()) {
miniBatchOp.addOperationsFromCP(0,
localUpdates.toArray(new Mutation[localUpdates.size()]));
}
- if (!indexUpdates.isEmpty()) {
- context.indexUpdates = indexUpdates;
- // write index updates to WAL
- if (durability != Durability.SKIP_WAL) {
- // we have all the WAL durability, so we just update the WAL
entry and move on
- for (Pair<Mutation, byte[]> entry : indexUpdates) {
- edit.add(new IndexedKeyValue(entry.getSecond(),
entry.getFirst()));
- }
+ if (!indexUpdatesForDeletes.isEmpty()) {
+ context.indexUpdates = indexUpdatesForDeletes;
+ }
+
+ if (!indexUpdates.isEmpty() && context.indexUpdates.isEmpty()) {
+ context.indexUpdates = new ArrayList<>(indexUpdates.size());
+ }
+ for (Pair<Pair<Mutation, byte[]>, byte[]> update : indexUpdates) {
+ context.indexUpdates.add(update.getFirst());
+ }
+ }
+ // Sleep for one millisecond if we have prepared the index updates in
less than 1 ms. The sleep is necessary to
+ // get different timestamps for concurrent batches that share common
rows. It is very rare that the index updates
+ // can be prepared in less than one millisecond
+ if (!context.rowLocks.isEmpty() && now ==
EnvironmentEdgeManager.currentTimeMillis()) {
+ Thread.sleep(1);
+ LOG.debug("slept 1ms");
+ }
+ // Release the locks before making RPC calls for index updates
+ for (RowLock rowLock : context.rowLocks) {
+ rowLock.release();
+ }
+ context.rowLocks.clear();
+ // Do the index updates
Review comment:
Suggestion: Refactor the code to have different functions for each step.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services