kadirozde commented on a change in pull request #603: PHOENIX-5478 IndexTool
mapper task should not timeout
URL: https://github.com/apache/phoenix/pull/603#discussion_r337797313
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
##########
@@ -1087,116 +1087,140 @@ private static PTable deserializeTable(byte[] b) {
throw new RuntimeException(e);
}
}
-
- private RegionScanner rebuildIndices(final RegionScanner innerScanner,
final Region region, final Scan scan,
- Configuration config) throws IOException {
- byte[] indexMetaData =
scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
- boolean useProto = true;
- // for backward compatibility fall back to look up by the old attribute
- if (indexMetaData == null) {
- useProto = false;
- indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+ private class IndexRebuildRegionScanner extends BaseRegionScanner {
+ private long pageSizeInRows = Long.MAX_VALUE;
+ private int rowCount = 0;
+ private boolean hasMore;
+ private final int maxBatchSize;
+ private MutationList mutations;
+ private final long maxBatchSizeBytes;
+ private final long blockingMemstoreSize;
+ private final byte[] clientVersionBytes;
+ private List<Cell> results = new ArrayList<Cell>();
+ private byte[] indexMetaData;
+ private boolean useProto = true;
+ private Scan scan;
+ private RegionScanner innerScanner;
+ final Region region;
+
+ IndexRebuildRegionScanner (final RegionScanner innerScanner, final
Region region, final Scan scan,
+ final Configuration config) {
+ super(innerScanner);
+ if
(scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) != null) {
+ pageSizeInRows =
config.getLong(INDEX_REBUILD_PAGE_SIZE_IN_ROWS,
+
QueryServicesOptions.DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS);
+ }
+
+ maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB,
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+ mutations = new MutationList(maxBatchSize);
+ maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
+ QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
+ blockingMemstoreSize = getBlockingMemstoreSize(region, config);
+ clientVersionBytes =
scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
+ indexMetaData =
scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
+ if (indexMetaData == null) {
+ useProto = false;
+ indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+ }
+ this.scan = scan;
+ this.innerScanner = innerScanner;
+ this.region = region;
}
- byte[] clientVersionBytes =
scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
- boolean hasMore;
- int rowCount = 0;
- try {
- int maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB,
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
- long maxBatchSizeBytes =
config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
- QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
- final long blockingMemstoreSize = getBlockingMemstoreSize(region,
config);
- MutationList mutations = new MutationList(maxBatchSize);
- region.startRegionOperation();
- byte[] uuidValue = ServerCacheClient.generateId();
- synchronized (innerScanner) {
- do {
- List<Cell> results = new ArrayList<Cell>();
- hasMore = innerScanner.nextRaw(results);
- if (!results.isEmpty()) {
- Put put = null;
- Delete del = null;
- for (Cell cell : results) {
-
- if (KeyValue.Type.codeToType(cell.getTypeByte())
== KeyValue.Type.Put) {
- if (put == null) {
- put = new Put(CellUtil.cloneRow(cell));
- put.setAttribute(useProto ?
PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
-
put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-
put.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
-
BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
-
put.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
- mutations.add(put);
- // Since we're replaying existing
mutations, it makes no sense to write them to the wal
- put.setDurability(Durability.SKIP_WAL);
- }
- put.add(cell);
- } else {
- if (del == null) {
- del = new Delete(CellUtil.cloneRow(cell));
- del.setAttribute(useProto ?
PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
-
del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-
del.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
-
BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
-
del.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
- mutations.add(del);
- // Since we're replaying existing
mutations, it makes no sense to write them to the wal
- del.setDurability(Durability.SKIP_WAL);
+ @Override
+ public RegionInfo getRegionInfo() {
+ return region.getRegionInfo();
+ }
+
+ @Override
+ public boolean isFilterDone() { return hasMore; }
+
+ @Override
+ public void close() throws IOException { innerScanner.close(); }
+
+ @Override
+ public boolean next(List<Cell> results) throws IOException {
+ try {
+ byte[] uuidValue = ServerCacheClient.generateId();
+ synchronized (this) {
Review comment:
Done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services