palashc commented on code in PR #1575:
URL: https://github.com/apache/phoenix/pull/1575#discussion_r1176934202
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ChildLinkMetaDataEndpoint.java:
##########
@@ -111,4 +145,162 @@ private PhoenixMetaDataCoprocessorHost
getCoprocessorHost() {
return phoenixAccessCoprocessorHost;
}
+ /**
+ * Class that verifies a given row of a SYSTEM.CHILD_LINK table.
+ * An instance of this class is created for each scanner on the table
+ * and used to verify individual rows.
+ */
+ public class ChildLinkMetaDataScanner extends ReadRepairScanner {
+
+ private Table sysCatHTable;
+ private Scan childLinkScan;
+
+ public ChildLinkMetaDataScanner(RegionCoprocessorEnvironment env,
+ Scan scan,
+ RegionScanner scanner) throws
IOException {
+ super(env, scan, scanner);
+ ageThreshold = env.getConfiguration().getLong(
+
QueryServices.CHILD_LINK_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB,
+
QueryServicesOptions.DEFAULT_CHILD_LINK_ROW_AGE_THRESHOLD_TO_DELETE_MS);
+ sysCatHTable = ServerUtil.ConnectionFactory.
+
getConnection(ServerUtil.ConnectionType.DEFAULT_SERVER_CONNECTION, env).
+
getTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME));
+ }
+
+ /*
+ If the row is VERIFIED, remove the empty column from the row
+ */
+ @Override
+ public boolean verifyRow(List<Cell> cellList) {
+ long cellListSize = cellList.size();
+ Cell cell = null;
+ if (cellListSize == 0) {
+ return true;
+ }
+ Iterator<Cell> cellIterator = cellList.iterator();
+ while (cellIterator.hasNext()) {
+ cell = cellIterator.next();
+ if (isEmptyColumn(cell)) {
+ if (Bytes.compareTo(cell.getValueArray(),
cell.getValueOffset(), cell.getValueLength(),
+ VERIFIED_BYTES, 0, VERIFIED_BYTES.length) != 0) {
+ return false;
+ }
+ // Empty column is not supposed to be returned to the
client except
+ // when it is the only column included in the scan
+ if (cellListSize > 1) {
+ cellIterator.remove();
+ }
+ return true;
+ }
+ }
+ // no empty column found
+ return false;
+ }
+
+ /*
+ Find parent link in syscat for given child link.
+ If found, mark child link row VERIFIED and start a new scan from it.
+ Otherwise, delete if row is old enough.
+ */
+ @Override
+ public void repairRow(List<Cell> row) throws IOException {
+ Cell cell = row.get(0);
+ byte[] rowKey = CellUtil.cloneRow(cell);
+ long ts = row.get(0).getTimestamp();
+
+ Scan sysCatScan = getExternalScanner();
+ childLinkScan = new Scan(scan);
+
+
+ // build syscat rowKey using given rowKey
+ byte[] sysCatRowKey = getSysCatRowKey(rowKey);
+
+ // scan syscat to find row
+ sysCatScan.withStartRow(sysCatRowKey, true);
+ sysCatScan.withStopRow(sysCatRowKey, true);
+ sysCatScan.setTimeRange(0, maxTimestamp);
+ Result result = null;
+ try (ResultScanner resultScanner =
sysCatHTable.getScanner(sysCatScan)){
+ result = resultScanner.next();
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(sysCatHTable.getName().toString(),
t);
+ }
+ // if row found, repair and verifyRowAndRemoveEmptyColumn
+ if (result != null && !result.isEmpty()) {
+ markChildLinkVerified(rowKey, ts, region);
+ scanner.close();
+ childLinkScan.withStartRow(rowKey, true);
+ scanner = region.getScanner(childLinkScan);
+ hasMore = true;
+ }
+ // if not, delete if old enough, otherwise ignore
+ else {
+ deleteIfAgedEnough(rowKey, ts, region);
+ if (restartScanDueToPageFilterRemoval) {
+ scanner.close();
+ childLinkScan.withStartRow(rowKey, true);
+ scanner = region.getScanner(childLinkScan);
+ hasMore = true;
+ restartScanDueToPageFilterRemoval = false;
+ }
+ }
+ row.clear();
+ }
+
+ /*
+ Construct row key for SYSTEM.CATALOG from a given SYSTEM.CHILD_LINK
row key
+ SYSTEM.CATALOG -> (CHILD_TENANT_ID, CHILD_SCHEMA, CHILD_TABLE,
PARENT_TENANT_ID, PARENT_FULL_NAME)
+ SYSTEM.CHILD_LINK -> (PARENT_TENANT_ID, PARENT_SCHEMA, PARENT_TABLE,
CHILD_TENANT_ID, CHILD_FULL_NAME)
+ */
+ private byte[] getSysCatRowKey(byte[] childLinkRowKey) {
+ String NULL_DELIMITER = "\0";
+ String[] childLinkRowKeyCols = new String(childLinkRowKey,
StandardCharsets.UTF_8).split(NULL_DELIMITER);
+ checkArgument(childLinkRowKeyCols.length == 5);
+ String parentTenantId = childLinkRowKeyCols[0];
+ String parentSchema = childLinkRowKeyCols[1];
+ String parentTable = childLinkRowKeyCols[2];
+ String childTenantId = childLinkRowKeyCols[3];
+ String childFullName = childLinkRowKeyCols[4];
+
+ String parentFullName = SchemaUtil.getTableName(parentSchema,
parentTable);
+ String childSchema =
SchemaUtil.getSchemaNameFromFullName(childFullName);
+ String childTable =
SchemaUtil.getTableNameFromFullName(childFullName);
+
+ String[] sysCatRowKeyCols = new String[] {childTenantId,
childSchema, childTable, parentTenantId, parentFullName};
+ return String.join(NULL_DELIMITER,
sysCatRowKeyCols).getBytes(StandardCharsets.UTF_8);
+ }
+
+
+ private void deleteIfAgedEnough(byte[] rowKey, long ts, Region region)
throws IOException {
+ if ((EnvironmentEdgeManager.currentTimeMillis() - ts) >
ageThreshold) {
+ Delete del = new Delete(rowKey);
+ Mutation[] mutations = new Mutation[]{del};
+ region.batchMutate(mutations);
+ }
+ }
+
+
+ private void markChildLinkVerified(byte[] rowKey, long ts, Region
region) throws IOException {
+ Put put = new Put(rowKey);
Review Comment:
the timestamp is extracted from the first cell in the row before starting
the repair
`long ts = row.get(0).getTimestamp();`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]