This is an automated email from the ASF dual-hosted git repository.
tdsilva pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new d9f6e96 PHOENIX-5137 check region close before commiting a batch for
index rebuild
d9f6e96 is described below
commit d9f6e969e66b99604a654a3ba51084b622723a74
Author: Kiran Kumar Maturi <[email protected]>
AuthorDate: Tue Feb 26 17:09:41 2019 +0530
PHOENIX-5137 check region close before commiting a batch for index rebuild
---
.../UngroupedAggregateRegionObserver.java | 30 +++++++++++++---------
1 file changed, 18 insertions(+), 12 deletions(-)
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 6b27a88..40b6faa 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -392,7 +392,18 @@ public class UngroupedAggregateRegionObserver extends
BaseScannerRegionObserver
super.clear();
}
}
-
+
+ private long getBlockingMemstoreSize(Region region, Configuration conf) {
+ long flushSize = region.getTableDescriptor().getMemStoreFlushSize();
+
+ if (flushSize <= 0) {
+ flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
+ TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE);
+ }
+ return flushSize *
(conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
+
HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER)-1) ;
+ }
+
@Override
protected RegionScanner doPostScannerOpen(final
ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final
RegionScanner s) throws IOException, SQLException {
RegionCoprocessorEnvironment env = c.getEnvironment();
@@ -524,12 +535,6 @@ public class UngroupedAggregateRegionObserver extends
BaseScannerRegionObserver
MutationList mutations = new MutationList();
boolean needToWrite = false;
Configuration conf = env.getConfiguration();
- long flushSize = region.getTableDescriptor().getMemStoreFlushSize();
-
- if (flushSize <= 0) {
- flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
- TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE);
- }
/**
* Slow down the writes if the memstore size more than
@@ -537,9 +542,7 @@ public class UngroupedAggregateRegionObserver extends
BaseScannerRegionObserver
* bytes. This avoids flush storm to hdfs for cases like index
building where reads and
* write happen to all the table regions in the server.
*/
- final long blockingMemStoreSize = flushSize * (
- conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
-
HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER)-1) ;
+ final long blockingMemStoreSize = getBlockingMemstoreSize(region,
conf);
boolean buildLocalIndex = indexMaintainers != null &&
dataColumns==null && !localIndexScan;
if(buildLocalIndex) {
@@ -1101,6 +1104,7 @@ public class UngroupedAggregateRegionObserver extends
BaseScannerRegionObserver
int maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB,
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
long maxBatchSizeBytes =
config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
+ final long blockingMemstoreSize = getBlockingMemstoreSize(region,
config);
MutationList mutations = new MutationList(maxBatchSize);
region.startRegionOperation();
byte[] uuidValue = ServerCacheClient.generateId();
@@ -1142,7 +1146,8 @@ public class UngroupedAggregateRegionObserver extends
BaseScannerRegionObserver
}
}
if (ServerUtil.readyToCommit(mutations.size(),
mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
- commitBatchWithRetries(region, mutations, -1);
+ checkForRegionClosing();
+ commitBatchWithRetries(region, mutations,
blockingMemstoreSize);
uuidValue = ServerCacheClient.generateId();
mutations.clear();
}
@@ -1151,7 +1156,8 @@ public class UngroupedAggregateRegionObserver extends
BaseScannerRegionObserver
} while (hasMore);
if (!mutations.isEmpty()) {
- commitBatchWithRetries(region, mutations, -1);
+ checkForRegionClosing();
+ commitBatchWithRetries(region, mutations,
blockingMemstoreSize);
}
}
} catch (IOException e) {