tkhurana commented on code in PR #1575:
URL: https://github.com/apache/phoenix/pull/1575#discussion_r1237829993
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ChildLinkMetaDataEndpoint.java:
##########
@@ -111,4 +143,197 @@ private PhoenixMetaDataCoprocessorHost
getCoprocessorHost() {
return phoenixAccessCoprocessorHost;
}
+ /**
+ * Class that verifies a given row of a SYSTEM.CHILD_LINK table.
+ * An instance of this class is created for each scanner on the table
+ * and used to verify individual rows.
+ */
+ public class ChildLinkMetaDataScanner extends ReadRepairScanner {
+
+ private Table sysCatHTable;
+ private Scan childLinkScan;
+ private Scan sysCatViewHeaderRowScan;
+ private Scan sysCatChildParentLinkScan;
+ private final String NULL_DELIMITER = "\0";
+
+ public ChildLinkMetaDataScanner(RegionCoprocessorEnvironment env,
+ Scan scan,
+ RegionScanner scanner) throws
IOException {
+ super(env, scan, scanner);
+ sysCatChildParentLinkScan = new Scan();
+ setSysCatViewHeaderRowScan();
+ ageThreshold = env.getConfiguration().getLong(
+
QueryServices.CHILD_LINK_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB,
+
QueryServicesOptions.DEFAULT_CHILD_LINK_ROW_AGE_THRESHOLD_TO_DELETE_MS);
+ sysCatHTable = ServerUtil.ConnectionFactory.
+
getConnection(ServerUtil.ConnectionType.DEFAULT_SERVER_CONNECTION, env).
+
getTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME));
+ }
+
+ /*
+ If the row is VERIFIED, remove the empty column from the row
+ */
+ @Override
+ public boolean verifyRow(List<Cell> cellList) {
+ long cellListSize = cellList.size();
+ Cell cell = null;
+ if (cellListSize == 0) {
+ return true;
+ }
+ Iterator<Cell> cellIterator = cellList.iterator();
+ while (cellIterator.hasNext()) {
+ cell = cellIterator.next();
+ if (isEmptyColumn(cell)) {
+ if (Bytes.compareTo(
+ cell.getValueArray(), cell.getValueOffset(),
cell.getValueLength(),
+ VERIFIED_BYTES, 0, VERIFIED_BYTES.length) != 0
+ ) {
+ return false;
+ }
+ // Empty column is not supposed to be returned to the
client except
+ // when it is the only column included in the scan
+ if (cellListSize > 1) {
+ cellIterator.remove();
+ }
+ return true;
+ }
+ }
+ // no empty column found
+ return false;
+ }
+
+ /*
+ Find parent link in syscat for given child link.
+ If found, mark child link row VERIFIED and start a new scan from it.
+ Otherwise, delete if row is old enough.
+ */
+ @Override
+ public void repairRow(List<Cell> row) throws IOException {
+ Cell cell = row.get(0);
+ byte[] rowKey = CellUtil.cloneRow(cell);
+ long ts = row.get(0).getTimestamp();
+
+ childLinkScan = new Scan(scan);
+
+ boolean isChildParentLinkPresent =
isRowPresentInSysCat(sysCatChildParentLinkScan,
+
getChildParentLinkSysCatRowKey(rowKey));
+
+ boolean isViewHeaderRowPresent = false;
+ if (isChildParentLinkPresent) {
+ isViewHeaderRowPresent =
isRowPresentInSysCat(sysCatViewHeaderRowScan,
+
getViewHeaderSysCatRowKey(rowKey));
+ }
+ // if row found, repair and verifyRowAndRemoveEmptyColumn
+ if (isChildParentLinkPresent && isViewHeaderRowPresent) {
+ markChildLinkVerified(rowKey, ts, region);
+ scanner.close();
+ childLinkScan.withStartRow(rowKey, true);
+ scanner = region.getScanner(childLinkScan);
+ hasMore = true;
+ }
+ // if not, delete if old enough, otherwise ignore
+ else {
+ deleteIfAgedEnough(rowKey, ts, region);
+ }
+ row.clear();
+ }
+
+ private boolean isRowPresentInSysCat(Scan scan, byte[] rowKey) throws
IOException {
+ scan.withStartRow(rowKey, true);
+ scan.withStopRow(rowKey, true);
+ scan.setTimeRange(0, maxTimestamp);
+
+ Result result = null;
+ try (ResultScanner resultScanner = sysCatHTable.getScanner(scan)) {
+ result = resultScanner.next();
+ }
+ catch (Throwable t) {
+ ServerUtil.throwIOException(sysCatHTable.getName().toString(),
t);
+ }
+
+ return (result != null) && (!result.isEmpty());
+ }
+
+ /*
+ Construct row key for SYSTEM.CATALOG view header row from a given
SYSTEM.CHILD_LINK row key
+ */
+ private byte[] getViewHeaderSysCatRowKey(byte[] childLinkRowKey) {
+ String[] childLinkRowKeyCols = new String(childLinkRowKey,
StandardCharsets.UTF_8)
+ .split(NULL_DELIMITER);
+ checkArgument(childLinkRowKeyCols.length == 5);
+ String childTenantId = childLinkRowKeyCols[3];
+ String childFullName = childLinkRowKeyCols[4];
+
+ String childSchema =
SchemaUtil.getSchemaNameFromFullName(childFullName);
+ String childTable =
SchemaUtil.getTableNameFromFullName(childFullName);
+
+ String[] sysCatRowKeyCols = new String[] {childTenantId,
childSchema, childTable};
+ return String.join(NULL_DELIMITER,
sysCatRowKeyCols).getBytes(StandardCharsets.UTF_8);
+ }
+
+ /*
+ Construct row key for SYSTEM.CATALOG parent->child link from a given
SYSTEM.CHILD_LINK row key
+ SYSTEM.CATALOG -> (CHILD_TENANT_ID, CHILD_SCHEMA, CHILD_TABLE,
PARENT_TENANT_ID, PARENT_FULL_NAME)
+ SYSTEM.CHILD_LINK -> (PARENT_TENANT_ID, PARENT_SCHEMA, PARENT_TABLE,
CHILD_TENANT_ID, CHILD_FULL_NAME)
+ */
+ private byte[] getChildParentLinkSysCatRowKey(byte[] childLinkRowKey) {
+ String[] childLinkRowKeyCols = new String(childLinkRowKey,
StandardCharsets.UTF_8)
+ .split(NULL_DELIMITER);
+ checkArgument(childLinkRowKeyCols.length == 5);
+ String parentTenantId = childLinkRowKeyCols[0];
+ String parentSchema = childLinkRowKeyCols[1];
+ String parentTable = childLinkRowKeyCols[2];
+ String childTenantId = childLinkRowKeyCols[3];
+ String childFullName = childLinkRowKeyCols[4];
+
+ String parentFullName = SchemaUtil.getTableName(parentSchema,
parentTable);
+ String childSchema =
SchemaUtil.getSchemaNameFromFullName(childFullName);
+ String childTable =
SchemaUtil.getTableNameFromFullName(childFullName);
+
+ String[] sysCatRowKeyCols = new String[] {childTenantId,
childSchema, childTable,
+ parentTenantId,
parentFullName};
+ return String.join(NULL_DELIMITER,
sysCatRowKeyCols).getBytes(StandardCharsets.UTF_8);
+ }
+
+
+ private void deleteIfAgedEnough(byte[] rowKey, long ts, Region region)
throws IOException {
+ if ((EnvironmentEdgeManager.currentTimeMillis() - ts) >
ageThreshold) {
+ Delete del = new Delete(rowKey);
+ Mutation[] mutations = new Mutation[]{del};
+ region.batchMutate(mutations);
+ }
+ }
+
+
+ private void markChildLinkVerified(byte[] rowKey, long ts, Region
region) throws IOException {
+ Put put = new Put(rowKey, ts);
+ put.addColumn(emptyCF, emptyCQ, ts, VERIFIED_BYTES);
+ Mutation[] mutations = new Mutation[]{put};
+ region.batchMutate(mutations);
+ }
+
+ private void setSysCatViewHeaderRowScan() {
+ sysCatViewHeaderRowScan = new Scan();
+ sysCatViewHeaderRowScan.addColumn(TABLE_FAMILY_BYTES,
TABLE_TYPE_BYTES);
+ SingleColumnValueFilter tableTypeFilter = new
SingleColumnValueFilter(
+ TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES,
CompareOperator.EQUAL,
+ PTableType.VIEW.getSerializedValue().getBytes());
+ sysCatViewHeaderRowScan.setFilter(tableTypeFilter);
+ }
+
+ }
+
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
+ public RegionScanner
postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
+ Scan scan, RegionScanner s) throws
IOException {
+ if (scan.getAttribute(CHECK_VERIFY_COLUMN) == null) {
+ return s;
+ }
+ return new ChildLinkMetaDataScanner(c.getEnvironment(), scan, s);
Review Comment:
You need to wrap this around PagingRegionScanner to get paging to work.
`return new ChildLinkMetaDataScanner(c.getEnvironment(), scan,
new PagingRegionScanner(c.getEnvironment().getRegion(), s,
scan));`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]