gokceni commented on a change in pull request #469: PHOENIX-5156 Consistent 
Global Indexes for Non-Transactional Tables
URL: https://github.com/apache/phoenix/pull/469#discussion_r280630543
 
 

 ##########
 File path: 
phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
 ##########
 @@ -506,30 +623,130 @@ public void 
preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnviro
           current.addTimelineAnnotation("Built index updates, doing preStep");
           TracingUtils.addAnnotation(current, "index update count", 
indexUpdates.size());
           byte[] tableName = 
c.getEnvironment().getRegion().getTableDescriptor().getTableName().getName();
-          Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = 
indexUpdates.iterator();
+          Iterator<Pair<Pair<Mutation, byte[]>, byte[]>> indexUpdatesItr = 
indexUpdates.iterator();
           List<Mutation> localUpdates = new 
ArrayList<Mutation>(indexUpdates.size());
+          postIndexUpdates = new ArrayList<>(indexUpdates.size());
+          indexUpdatesForDeletes = new ArrayList<>(indexUpdates.size());
           while(indexUpdatesItr.hasNext()) {
-              Pair<Mutation, byte[]> next = indexUpdatesItr.next();
-              if (Bytes.compareTo(next.getSecond(), tableName) == 0) {
-                  localUpdates.add(next.getFirst());
+              Pair<Pair<Mutation, byte[]>, byte[]> next = 
indexUpdatesItr.next();
+              if (Bytes.compareTo(next.getFirst().getSecond(), tableName) == 
0) {
+                  localUpdates.add(next.getFirst().getFirst());
                   indexUpdatesItr.remove();
               }
+              else {
+                  // get index maintainer for this index table
+                  IndexMaintainer indexMaintainer = 
getIndexMaintainer(maintainers, next.getFirst().getSecond());
+                  if (indexMaintainer == null) {
+                      throw new DoNotRetryIOException(
+                              "preBatchMutateWithExceptions: indexMaintainer 
is null " +
+                                      
c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
+                  }
+                  byte[] emptyCF = 
indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
+                  byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+                  // add the VERIFIED cell, which is the empty cell
+                  Mutation m = next.getFirst().getFirst();
+                  boolean rebuild = 
PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap());
+                  long ts = getMaxTimestamp(m);
+                  if (rebuild) {
+                      if (m instanceof Put) {
+                          ((Put)m).addColumn(emptyCF, emptyCQ, ts, TRUE_BYTES);
+                      }
+                  } else {
+                      if (m instanceof Put) {
+                          ((Put)m).addColumn(emptyCF, emptyCQ, ts, 
FALSE_BYTES);
+                          // Ignore post index updates (i.e., the third write 
phase updates) for this row if it is
+                          // going through concurrent updates
+                          RowKey rowKey = new RowKey(next.getSecond());
+                          if (!context.pendingRows.contains(rowKey)) {
+                              Put put = new Put(m.getRow());
+                              put.addColumn(emptyCF, emptyCQ, ts, TRUE_BYTES);
+                              postIndexUpdates.add(new Pair<>(new Pair<>(put, 
next.getFirst().getSecond()), next.getSecond()));
+                          }
+                      } else {
+                          // For a delete mutation, first unverify the exiting 
row in the index table and then delete
+                          // the row from the index table after deleting the 
corresponding row from the data table
+                          indexUpdatesItr.remove();
+                          Put put = new Put(m.getRow());
+                          put.addColumn(emptyCF, emptyCQ, ts, FALSE_BYTES);
+                          indexUpdatesForDeletes.add(new Pair<>(put, 
next.getFirst().getSecond()));
+                          // Ignore post index updates (i.e., the third write 
phase updates) for this row if it is
+                          // going through concurrent updates
+                          RowKey rowKey = new RowKey(next.getSecond());
+                          if (!context.pendingRows.contains(rowKey)) {
+                              postIndexUpdates.add(next);
+                          }
+                      }
+                  }
+              }
           }
           if (!localUpdates.isEmpty()) {
               miniBatchOp.addOperationsFromCP(0,
                   localUpdates.toArray(new Mutation[localUpdates.size()]));
           }
-          if (!indexUpdates.isEmpty()) {
-              context.indexUpdates = indexUpdates;
-              // write index updates to WAL
-              if (durability != Durability.SKIP_WAL) {
-                  // we have all the WAL durability, so we just update the WAL 
entry and move on
-                  for (Pair<Mutation, byte[]> entry : indexUpdates) {
-                    edit.add(new IndexedKeyValue(entry.getSecond(), 
entry.getFirst()));
-                  }              
+          if (!indexUpdatesForDeletes.isEmpty()) {
+              context.indexUpdates = indexUpdatesForDeletes;
+          }
+
+          if (!indexUpdates.isEmpty() && context.indexUpdates.isEmpty()) {
+              context.indexUpdates = new ArrayList<>(indexUpdates.size());
+          }
+          for (Pair<Pair<Mutation, byte[]>, byte[]> update : indexUpdates) {
+                  context.indexUpdates.add(update.getFirst());
+          }
+      }
+      // Sleep for one millisecond if we have prepared the index updates in 
less than 1 ms. The sleep is necessary to
+      // get different timestamps for concurrent batches that share common 
rows. It is very rare that the index updates
+      // can be prepared in less than one millisecond
+      if (!context.rowLocks.isEmpty() && now == 
EnvironmentEdgeManager.currentTimeMillis()) {
+          Thread.sleep(1);
+          LOG.debug("slept 1ms");
+      }
+      // Release the locks before making RPC calls for index updates
+      for (RowLock rowLock : context.rowLocks) {
+          rowLock.release();
+      }
+      context.rowLocks.clear();
+      // Do the index updates
 
 Review comment:
   Suggestion: Refactor the code to have different functions for each step. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to