This is an automated email from the ASF dual-hosted git repository.

tdsilva pushed a commit to branch 4.14-HBase-1.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit 1916b3d76cdc9a190680476828442da1dd9b91cc
Author: Kiran Kumar Maturi <maturi.ki...@gmail.com>
AuthorDate: Fri Feb 22 09:45:13 2019 +0530

    PHOENIX-5137 check region close before commiting a batch for index rebuild
---
 .../UngroupedAggregateRegionObserver.java          | 30 +++++++++++++---------
 1 file changed, 18 insertions(+), 12 deletions(-)

diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 703ff97..2eb15a1 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -260,7 +260,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
           return;
       }
 
-        Mutation[] mutationArray = new Mutation[mutations.size()];
+       Mutation[] mutationArray = new Mutation[mutations.size()];
       // When memstore size reaches blockingMemstoreSize we are waiting 3 
seconds for the
       // flush happen which decrease the memstore size and then writes allowed 
on the region.
       for (int i = 0; blockingMemstoreSize > 0 && region.getMemstoreSize() > 
blockingMemstoreSize && i < 30; i++) {
@@ -371,6 +371,17 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
             super.clear();
         }
     }
+
+   private long getBlockingMemstoreSize(Region region, Configuration conf) {
+       long flushSize = region.getTableDesc().getMemStoreFlushSize();
+
+       if (flushSize <= 0) {
+           flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
+                   HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
+       }
+       return flushSize * 
(conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
+                   HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER)-1);
+   }
     
     @Override
     protected RegionScanner doPostScannerOpen(final 
ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final 
RegionScanner s) throws IOException, SQLException {
@@ -487,12 +498,6 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
         MutationList mutations = new MutationList();
         boolean needToWrite = false;
         Configuration conf = env.getConfiguration();
-        long flushSize = region.getTableDesc().getMemStoreFlushSize();
-
-        if (flushSize <= 0) {
-            flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
-                    HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
-        }
 
         /**
          * Slow down the writes if the memstore size more than
@@ -500,9 +505,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
          * bytes. This avoids flush storm to hdfs for cases like index 
building where reads and
          * write happen to all the table regions in the server.
          */
-        final long blockingMemStoreSize = flushSize * (
-                conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
-                        
HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER)-1) ;
+        final long blockingMemStoreSize = getBlockingMemstoreSize(region, 
conf) ;
 
         boolean buildLocalIndex = indexMaintainers != null && 
dataColumns==null && !localIndexScan;
         if(buildLocalIndex) {
@@ -1043,6 +1046,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
             int maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, 
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
             long maxBatchSizeBytes = 
config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
                 QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
+            final long blockingMemstoreSize = getBlockingMemstoreSize(region, 
config);
             MutationList mutations = new MutationList(maxBatchSize);
             region.startRegionOperation();
             byte[] uuidValue = ServerCacheClient.generateId();
@@ -1084,7 +1088,8 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                             }
                         }
                         if (ServerUtil.readyToCommit(mutations.size(), 
mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
-                            commitBatchWithRetries(region, mutations, -1);
+                            checkForRegionClosingOrSplitting();
+                            commitBatchWithRetries(region, mutations, 
blockingMemstoreSize);
                             uuidValue = ServerCacheClient.generateId();
                             mutations.clear();
                         }
@@ -1093,7 +1098,8 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                     
                 } while (hasMore);
                 if (!mutations.isEmpty()) {
-                    commitBatchWithRetries(region, mutations, -1);
+                    checkForRegionClosingOrSplitting();
+                    commitBatchWithRetries(region, mutations, 
blockingMemstoreSize);
                 }
             }
         } catch (IOException e) {

Reply via email to