kadirozde commented on a change in pull request #603: PHOENIX-5478 IndexTool
mapper task should not timeout
URL: https://github.com/apache/phoenix/pull/603#discussion_r337759550
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
##########
@@ -1087,116 +1087,140 @@ private static PTable deserializeTable(byte[] b) {
throw new RuntimeException(e);
}
}
-
- private RegionScanner rebuildIndices(final RegionScanner innerScanner,
final Region region, final Scan scan,
- Configuration config) throws IOException {
- byte[] indexMetaData =
scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
- boolean useProto = true;
- // for backward compatibility fall back to look up by the old attribute
- if (indexMetaData == null) {
- useProto = false;
- indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+ private class IndexRebuildRegionScanner extends BaseRegionScanner {
+ private long pageSizeInRows = Long.MAX_VALUE;
+ private int rowCount = 0;
+ private boolean hasMore;
+ private final int maxBatchSize;
+ private MutationList mutations;
+ private final long maxBatchSizeBytes;
+ private final long blockingMemstoreSize;
+ private final byte[] clientVersionBytes;
+ private List<Cell> results = new ArrayList<Cell>();
+ private byte[] indexMetaData;
+ private boolean useProto = true;
+ private Scan scan;
+ private RegionScanner innerScanner;
+ final Region region;
+
+ IndexRebuildRegionScanner (final RegionScanner innerScanner, final
Region region, final Scan scan,
+ final Configuration config) {
+ super(innerScanner);
+ if
(scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) != null) {
+ pageSizeInRows =
config.getLong(INDEX_REBUILD_PAGE_SIZE_IN_ROWS,
+
QueryServicesOptions.DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS);
+ }
+
+ maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB,
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+ mutations = new MutationList(maxBatchSize);
+ maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
+ QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
+ blockingMemstoreSize = getBlockingMemstoreSize(region, config);
+ clientVersionBytes =
scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
+ indexMetaData =
scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
+ if (indexMetaData == null) {
+ useProto = false;
+ indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+ }
+ this.scan = scan;
+ this.innerScanner = innerScanner;
+ this.region = region;
}
- byte[] clientVersionBytes =
scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
- boolean hasMore;
- int rowCount = 0;
- try {
- int maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB,
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
- long maxBatchSizeBytes =
config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
- QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
- final long blockingMemstoreSize = getBlockingMemstoreSize(region,
config);
- MutationList mutations = new MutationList(maxBatchSize);
- region.startRegionOperation();
- byte[] uuidValue = ServerCacheClient.generateId();
- synchronized (innerScanner) {
- do {
- List<Cell> results = new ArrayList<Cell>();
- hasMore = innerScanner.nextRaw(results);
- if (!results.isEmpty()) {
- Put put = null;
- Delete del = null;
- for (Cell cell : results) {
-
- if (KeyValue.Type.codeToType(cell.getTypeByte())
== KeyValue.Type.Put) {
- if (put == null) {
- put = new Put(CellUtil.cloneRow(cell));
- put.setAttribute(useProto ?
PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
-
put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-
put.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
-
BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
-
put.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
- mutations.add(put);
- // Since we're replaying existing
mutations, it makes no sense to write them to the wal
- put.setDurability(Durability.SKIP_WAL);
- }
- put.add(cell);
- } else {
- if (del == null) {
- del = new Delete(CellUtil.cloneRow(cell));
- del.setAttribute(useProto ?
PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
-
del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-
del.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
-
BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
-
del.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
- mutations.add(del);
- // Since we're replaying existing
mutations, it makes no sense to write them to the wal
- del.setDurability(Durability.SKIP_WAL);
+ @Override
+ public RegionInfo getRegionInfo() {
+ return region.getRegionInfo();
+ }
+
+ @Override
+ public boolean isFilterDone() { return hasMore; }
+
+ @Override
+ public void close() throws IOException { innerScanner.close(); }
+
+ @Override
+ public boolean next(List<Cell> results) throws IOException {
+ try {
+ byte[] uuidValue = ServerCacheClient.generateId();
+ synchronized (this) {
+ do {
+ List<Cell> row = new ArrayList<Cell>();
+ hasMore = innerScanner.nextRaw(row);
+ if (!row.isEmpty()) {
+ Put put = null;
+ Delete del = null;
+ for (Cell cell : row) {
+
+ if
(KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+ if (put == null) {
+ put = new Put(CellUtil.cloneRow(cell));
+ put.setAttribute(useProto ?
PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
+
put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+
put.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
+
BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
+
put.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
+ mutations.add(put);
+ // Since we're replaying existing
mutations, it makes no sense to write them to the wal
+ put.setDurability(Durability.SKIP_WAL);
+ }
+ put.add(cell);
+ } else {
+ if (del == null) {
+ del = new
Delete(CellUtil.cloneRow(cell));
+ del.setAttribute(useProto ?
PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
+
del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+
del.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
+
BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
+
del.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
+ mutations.add(del);
+ // Since we're replaying existing
mutations, it makes no sense to write them to the wal
+ del.setDurability(Durability.SKIP_WAL);
+ }
+ del.addDeleteMarker(cell);
}
- del.addDeleteMarker(cell);
}
+ if (ServerUtil.readyToCommit(mutations.size(),
mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
+ checkForRegionClosing();
+ commitBatchWithRetries(region, mutations,
blockingMemstoreSize);
+ uuidValue = ServerCacheClient.generateId();
+ mutations.clear();
+ }
+ rowCount++;
}
- if (ServerUtil.readyToCommit(mutations.size(),
mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
- checkForRegionClosing();
- commitBatchWithRetries(region, mutations,
blockingMemstoreSize);
- uuidValue = ServerCacheClient.generateId();
- mutations.clear();
- }
- rowCount++;
+
+ } while (hasMore && rowCount < pageSizeInRows);
+ if (!mutations.isEmpty()) {
+ checkForRegionClosing();
+ commitBatchWithRetries(region, mutations,
blockingMemstoreSize);
}
-
- } while (hasMore);
- if (!mutations.isEmpty()) {
- checkForRegionClosing();
- commitBatchWithRetries(region, mutations,
blockingMemstoreSize);
+ }
+ } catch (IOException e) {
+ hasMore = false;
+ LOGGER.error("IOException during rebuilding: " +
Throwables.getStackTraceAsString(e));
+ throw e;
+ } finally {
+ if (!hasMore) {
+ region.closeRegionOperation();
}
}
- } catch (IOException e) {
- LOGGER.error("IOException during rebuilding: " +
Throwables.getStackTraceAsString(e));
- throw e;
- } finally {
- region.closeRegionOperation();
+ byte[] rowCountBytes =
PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
Review comment:
The row count is used to show the progress for the MR job. It is not really
for correctness. However, if there is an exception, then the mapper task will
get an exception and no row count will be returned to the mapper. The mapper
task will start from the beginning to rebuild those rows again. When that
happens, I expect the row count will be wrong because this mapper task will
rebuild and double count for the rows that are rebuilt again. I will think
about improving this, maybe not in this JIRA.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services