This is an automated email from the ASF dual-hosted git repository.

skadam pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new 48970e9  PHOENIX-6080: Add a check to Index Rebuild jobs to check 
region closing before every inner batch (#860)
48970e9 is described below

commit 48970e98e8c0038455942a10e0604fba80885ddb
Author: Swaroopa Kadam <[email protected]>
AuthorDate: Fri Aug 21 10:04:56 2020 -0700

    PHOENIX-6080: Add a check to Index Rebuild jobs to check region closing 
before every inner batch (#860)
    
    Co-authored-by: s.kadam <[email protected]>
---
 .../phoenix/coprocessor/IndexRebuildRegionScanner.java | 18 +++++++++++++++++-
 .../coprocessor/UngroupedAggregateRegionObserver.java  |  6 +++---
 2 files changed, 20 insertions(+), 4 deletions(-)

diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 7d7ab9d..06a79e7 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -106,6 +106,8 @@ public class IndexRebuildRegionScanner extends 
GlobalIndexRegionScanner {
         "phoenix.index.mr.log.beyond.max.lookback.errors";
     public static final boolean 
DEFAULT_PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS = false;
     private static boolean ignoreIndexRebuildForTesting  = false;
+    private final UngroupedAggregateRegionObserver 
ungroupedAggregateRegionObserver;
+
     public static void setIgnoreIndexRebuildForTesting(boolean ignore) { 
ignoreIndexRebuildForTesting = ignore; }
     private byte[] indexRowKeyforReadRepair;
     private IndexTool.IndexDisableLoggingType disableLoggingVerifyType = 
IndexTool.IndexDisableLoggingType.NONE;
@@ -122,9 +124,11 @@ public class IndexRebuildRegionScanner extends 
GlobalIndexRegionScanner {
 
     @VisibleForTesting
     public IndexRebuildRegionScanner(final RegionScanner innerScanner, final 
Region region, final Scan scan,
-                                     final RegionCoprocessorEnvironment env) 
throws IOException {
+            final RegionCoprocessorEnvironment env,
+            final UngroupedAggregateRegionObserver 
ungroupedAggregateRegionObserver) throws IOException {
         super(innerScanner, region, scan, env);
         this.env = env;
+        this.ungroupedAggregateRegionObserver = 
ungroupedAggregateRegionObserver;
         indexRowKeyforReadRepair = 
scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY);
         if (indexRowKeyforReadRepair != null) {
             setReturnCodeForSingleRowRebuild();
@@ -812,6 +816,7 @@ public class IndexRebuildRegionScanner extends 
GlobalIndexRegionScanner {
         indexScan.setCacheBlocks(false);
         try (ResultScanner resultScanner = indexHTable.getScanner(indexScan)) {
             for (Result result = resultScanner.next(); (result != null); 
result = resultScanner.next()) {
+                ungroupedAggregateRegionObserver.checkForRegionClosing();
                 if (!verifySingleIndexRow(result, indexKeyToMutationMap, 
mostRecentIndexRowKeys, oldIndexRowsToBeDeletedList,
                         verificationPhaseResult, isBeforeRebuild)) {
                     invalidIndexRows.put(result.getRow(), 
indexKeyToMutationMap.get(result.getRow()));
@@ -879,12 +884,14 @@ public class IndexRebuildRegionScanner extends 
GlobalIndexRegionScanner {
                 indexUpdates.addAll(mutationList);
                 batchSize += mutationList.size();
                 if (batchSize >= maxBatchSize) {
+                    ungroupedAggregateRegionObserver.checkForRegionClosing();
                     indexHTable.batch(indexUpdates);
                     batchSize = 0;
                     indexUpdates = new ArrayList<Mutation>(maxBatchSize);
                 }
             }
             if (batchSize > 0) {
+                ungroupedAggregateRegionObserver.checkForRegionClosing();
                 indexHTable.batch(indexUpdates);
             }
             batchSize = 0;
@@ -893,12 +900,14 @@ public class IndexRebuildRegionScanner extends 
GlobalIndexRegionScanner {
                 indexUpdates.add(mutation);
                 batchSize ++;
                 if (batchSize >= maxBatchSize) {
+                    ungroupedAggregateRegionObserver.checkForRegionClosing();
                     indexHTable.batch(indexUpdates);
                     batchSize = 0;
                     indexUpdates = new ArrayList<Mutation>(maxBatchSize);
                 }
             }
             if (batchSize > 0) {
+                ungroupedAggregateRegionObserver.checkForRegionClosing();
                 indexHTable.batch(indexUpdates);
             }
             if (verify) {
@@ -1309,6 +1318,12 @@ public class IndexRebuildRegionScanner extends 
GlobalIndexRegionScanner {
                     return false;
                 }
                 do {
+                    /*
+                        If region is closing and there are large number of 
rows being verified/rebuilt with IndexTool,
+                        not having this check will impact/delay the region 
closing -- affecting the availability
+                        as this method holds the read lock on the region.
+                    * */
+                    ungroupedAggregateRegionObserver.checkForRegionClosing();
                     List<Cell> row = new ArrayList<Cell>();
                     hasMore = localScanner.nextRaw(row);
                     if (!row.isEmpty()) {
@@ -1412,6 +1427,7 @@ public class IndexRebuildRegionScanner extends 
GlobalIndexRegionScanner {
                 // collect row keys that have been modified in the given 
time-range
                 // up to the size of page to build skip scan filter
                 do {
+                    ungroupedAggregateRegionObserver.checkForRegionClosing();
                     hasMoreIncr = scanner.nextRaw(row);
                     if (!row.isEmpty()) {
                         
keys.add(PVarbinary.INSTANCE.getKeyRange(CellUtil.cloneRow(row.get(0))));
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index af09c3c..50a2d89 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -352,7 +352,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
         return s;
     }
 
-   public static class MutationList extends ArrayList<Mutation> {
+    public static class MutationList extends ArrayList<Mutation> {
         private long byteSize = 0l;
         public MutationList() {
             super();
@@ -1103,13 +1103,13 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
             if (oldCoproc) {
                 return new IndexerRegionScanner(scanner, region, scan, env, 
this);
             } else {
-                return new IndexRebuildRegionScanner(scanner, region, scan, 
env);
+                return new IndexRebuildRegionScanner(scanner, region, scan, 
env, this);
             }
         }
         if (oldCoproc) {
             return new IndexerRegionScanner(innerScanner, region, scan, env, 
this);
         } else {
-            return new IndexRebuildRegionScanner(innerScanner, region, scan, 
env);
+            return new IndexRebuildRegionScanner(innerScanner, region, scan, 
env, this);
         }
     }
     

Reply via email to