This is an automated email from the ASF dual-hosted git repository.
skadam pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push:
new 48970e9 PHOENIX-6080: Add a check to Index Rebuild jobs to check
region closing before every inner batch (#860)
48970e9 is described below
commit 48970e98e8c0038455942a10e0604fba80885ddb
Author: Swaroopa Kadam <[email protected]>
AuthorDate: Fri Aug 21 10:04:56 2020 -0700
PHOENIX-6080: Add a check to Index Rebuild jobs to check region closing
before every inner batch (#860)
Co-authored-by: s.kadam <[email protected]>
---
.../phoenix/coprocessor/IndexRebuildRegionScanner.java | 18 +++++++++++++++++-
.../coprocessor/UngroupedAggregateRegionObserver.java | 6 +++---
2 files changed, 20 insertions(+), 4 deletions(-)
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 7d7ab9d..06a79e7 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -106,6 +106,8 @@ public class IndexRebuildRegionScanner extends
GlobalIndexRegionScanner {
"phoenix.index.mr.log.beyond.max.lookback.errors";
public static final boolean
DEFAULT_PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS = false;
private static boolean ignoreIndexRebuildForTesting = false;
+ private final UngroupedAggregateRegionObserver
ungroupedAggregateRegionObserver;
+
public static void setIgnoreIndexRebuildForTesting(boolean ignore) {
ignoreIndexRebuildForTesting = ignore; }
private byte[] indexRowKeyforReadRepair;
private IndexTool.IndexDisableLoggingType disableLoggingVerifyType =
IndexTool.IndexDisableLoggingType.NONE;
@@ -122,9 +124,11 @@ public class IndexRebuildRegionScanner extends
GlobalIndexRegionScanner {
@VisibleForTesting
public IndexRebuildRegionScanner(final RegionScanner innerScanner, final
Region region, final Scan scan,
- final RegionCoprocessorEnvironment env)
throws IOException {
+ final RegionCoprocessorEnvironment env,
+ final UngroupedAggregateRegionObserver
ungroupedAggregateRegionObserver) throws IOException {
super(innerScanner, region, scan, env);
this.env = env;
+ this.ungroupedAggregateRegionObserver =
ungroupedAggregateRegionObserver;
indexRowKeyforReadRepair =
scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY);
if (indexRowKeyforReadRepair != null) {
setReturnCodeForSingleRowRebuild();
@@ -812,6 +816,7 @@ public class IndexRebuildRegionScanner extends
GlobalIndexRegionScanner {
indexScan.setCacheBlocks(false);
try (ResultScanner resultScanner = indexHTable.getScanner(indexScan)) {
for (Result result = resultScanner.next(); (result != null);
result = resultScanner.next()) {
+ ungroupedAggregateRegionObserver.checkForRegionClosing();
if (!verifySingleIndexRow(result, indexKeyToMutationMap,
mostRecentIndexRowKeys, oldIndexRowsToBeDeletedList,
verificationPhaseResult, isBeforeRebuild)) {
invalidIndexRows.put(result.getRow(),
indexKeyToMutationMap.get(result.getRow()));
@@ -879,12 +884,14 @@ public class IndexRebuildRegionScanner extends
GlobalIndexRegionScanner {
indexUpdates.addAll(mutationList);
batchSize += mutationList.size();
if (batchSize >= maxBatchSize) {
+ ungroupedAggregateRegionObserver.checkForRegionClosing();
indexHTable.batch(indexUpdates);
batchSize = 0;
indexUpdates = new ArrayList<Mutation>(maxBatchSize);
}
}
if (batchSize > 0) {
+ ungroupedAggregateRegionObserver.checkForRegionClosing();
indexHTable.batch(indexUpdates);
}
batchSize = 0;
@@ -893,12 +900,14 @@ public class IndexRebuildRegionScanner extends
GlobalIndexRegionScanner {
indexUpdates.add(mutation);
batchSize ++;
if (batchSize >= maxBatchSize) {
+ ungroupedAggregateRegionObserver.checkForRegionClosing();
indexHTable.batch(indexUpdates);
batchSize = 0;
indexUpdates = new ArrayList<Mutation>(maxBatchSize);
}
}
if (batchSize > 0) {
+ ungroupedAggregateRegionObserver.checkForRegionClosing();
indexHTable.batch(indexUpdates);
}
if (verify) {
@@ -1309,6 +1318,12 @@ public class IndexRebuildRegionScanner extends
GlobalIndexRegionScanner {
return false;
}
do {
+ /*
+ If region is closing and there are large number of
rows being verified/rebuilt with IndexTool,
+ not having this check will impact/delay the region
closing -- affecting the availability
+ as this method holds the read lock on the region.
+ * */
+ ungroupedAggregateRegionObserver.checkForRegionClosing();
List<Cell> row = new ArrayList<Cell>();
hasMore = localScanner.nextRaw(row);
if (!row.isEmpty()) {
@@ -1412,6 +1427,7 @@ public class IndexRebuildRegionScanner extends
GlobalIndexRegionScanner {
// collect row keys that have been modified in the given
time-range
// up to the size of page to build skip scan filter
do {
+ ungroupedAggregateRegionObserver.checkForRegionClosing();
hasMoreIncr = scanner.nextRaw(row);
if (!row.isEmpty()) {
keys.add(PVarbinary.INSTANCE.getKeyRange(CellUtil.cloneRow(row.get(0))));
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index af09c3c..50a2d89 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -352,7 +352,7 @@ public class UngroupedAggregateRegionObserver extends
BaseScannerRegionObserver
return s;
}
- public static class MutationList extends ArrayList<Mutation> {
+ public static class MutationList extends ArrayList<Mutation> {
private long byteSize = 0l;
public MutationList() {
super();
@@ -1103,13 +1103,13 @@ public class UngroupedAggregateRegionObserver extends
BaseScannerRegionObserver
if (oldCoproc) {
return new IndexerRegionScanner(scanner, region, scan, env,
this);
} else {
- return new IndexRebuildRegionScanner(scanner, region, scan,
env);
+ return new IndexRebuildRegionScanner(scanner, region, scan,
env, this);
}
}
if (oldCoproc) {
return new IndexerRegionScanner(innerScanner, region, scan, env,
this);
} else {
- return new IndexRebuildRegionScanner(innerScanner, region, scan,
env);
+ return new IndexRebuildRegionScanner(innerScanner, region, scan,
env, this);
}
}