kadirozde commented on a change in pull request #603: PHOENIX-5478 IndexTool 
mapper task should not timeout
URL: https://github.com/apache/phoenix/pull/603#discussion_r337683849
 
 

 ##########
 File path: 
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 ##########
 @@ -1087,116 +1087,140 @@ private static PTable deserializeTable(byte[] b) {
             throw new RuntimeException(e);
         }
     }
-    
-    private RegionScanner rebuildIndices(final RegionScanner innerScanner, 
final Region region, final Scan scan,
-            Configuration config) throws IOException {
-        byte[] indexMetaData = 
scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
-        boolean useProto = true;
-        // for backward compatibility fall back to look up by the old attribute
-        if (indexMetaData == null) {
-            useProto = false;
-            indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+    private class IndexRebuildRegionScanner extends BaseRegionScanner {
+        private long pageSizeInRows = Long.MAX_VALUE;
+        private int rowCount = 0;
+        private boolean hasMore;
+        private final int maxBatchSize;
+        private MutationList mutations;
+        private final long maxBatchSizeBytes;
+        private final long blockingMemstoreSize;
+        private final byte[] clientVersionBytes;
+        private List<Cell> results = new ArrayList<Cell>();
+        private byte[] indexMetaData;
+        private boolean useProto = true;
+        private Scan scan;
+        private RegionScanner innerScanner;
+        final Region region;
+
+        IndexRebuildRegionScanner (final RegionScanner innerScanner, final 
Region region, final Scan scan,
+                                   final Configuration config) {
+            super(innerScanner);
+            if 
(scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) != null) {
+                pageSizeInRows = 
config.getLong(INDEX_REBUILD_PAGE_SIZE_IN_ROWS,
+                        
QueryServicesOptions.DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS);
+            }
+
+            maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, 
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+            mutations = new MutationList(maxBatchSize);
+            maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
+                    QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
+            blockingMemstoreSize = getBlockingMemstoreSize(region, config);
+            clientVersionBytes = 
scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
+            indexMetaData = 
scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
+            if (indexMetaData == null) {
+                useProto = false;
+                indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+            }
+            this.scan = scan;
+            this.innerScanner = innerScanner;
+            this.region = region;
         }
-        byte[] clientVersionBytes = 
scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
-        boolean hasMore;
-        int rowCount = 0;
-        try {
-            int maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, 
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
-            long maxBatchSizeBytes = 
config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
-                QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
-            final long blockingMemstoreSize = getBlockingMemstoreSize(region, 
config);
-            MutationList mutations = new MutationList(maxBatchSize);
-            region.startRegionOperation();
-            byte[] uuidValue = ServerCacheClient.generateId();
-            synchronized (innerScanner) {
-                do {
-                    List<Cell> results = new ArrayList<Cell>();
-                    hasMore = innerScanner.nextRaw(results);
-                    if (!results.isEmpty()) {
-                        Put put = null;
-                        Delete del = null;
-                        for (Cell cell : results) {
-
-                            if (KeyValue.Type.codeToType(cell.getTypeByte()) 
== KeyValue.Type.Put) {
-                                if (put == null) {
-                                    put = new Put(CellUtil.cloneRow(cell));
-                                    put.setAttribute(useProto ? 
PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
-                                    
put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-                                    
put.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
-                                        
BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
-                                    
put.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
-                                    mutations.add(put);
-                                    // Since we're replaying existing 
mutations, it makes no sense to write them to the wal
-                                    put.setDurability(Durability.SKIP_WAL);
-                                }
-                                put.add(cell);
-                            } else {
-                                if (del == null) {
-                                    del = new Delete(CellUtil.cloneRow(cell));
-                                    del.setAttribute(useProto ? 
PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
-                                    
del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-                                    
del.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
-                                        
BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
-                                    
del.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
-                                    mutations.add(del);
-                                    // Since we're replaying existing 
mutations, it makes no sense to write them to the wal
-                                    del.setDurability(Durability.SKIP_WAL);
+        @Override
+        public RegionInfo getRegionInfo() {
+            return region.getRegionInfo();
+        }
+
+        @Override
+        public boolean isFilterDone() { return hasMore; }
+
+        @Override
+        public void close() throws IOException { innerScanner.close(); }
+
+        @Override
+        public boolean next(List<Cell> results) throws IOException {
+            try {
+                byte[] uuidValue = ServerCacheClient.generateId();
+                synchronized (this) {
 
 Review comment:
   @BinShi-SecularBird, yes the previous code synchronized the access the 
innerScanner object. Actually, I am not sure why we need to synchronization 
here, how it would be different if you synchronize the access to the 
innerScanner vs this. Regardless, I agree with you that I should have preserved 
the previous code. I will revert this. Good catch.  

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to