[
https://issues.apache.org/jira/browse/PHOENIX-6141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17719567#comment-17719567
]
ASF GitHub Bot commented on PHOENIX-6141:
-----------------------------------------
jpisaac commented on code in PR #1575:
URL: https://github.com/apache/phoenix/pull/1575#discussion_r1185605456
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ReadRepairScanner.java:
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.filter.PagedFilter;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import static
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME;
+import static
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME;
+import static org.apache.phoenix.util.ScanUtil.getDummyResult;
+import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForRegionScanner;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
+
+public abstract class ReadRepairScanner extends BaseRegionScanner {
+
+ public Logger LOGGER;
+ public RegionScanner scanner;
+ public Scan scan;
+ public RegionCoprocessorEnvironment env;
+ public byte[] emptyCF;
+ public byte[] emptyCQ;
+ public Region region;
+ public boolean hasMore;
+ public long pageSizeMs;
+ public long pageSize = Long.MAX_VALUE;
+ public long rowCount = 0;
+ public long maxTimestamp;
+ public long ageThreshold;
+ public boolean restartScanDueToPageFilterRemoval = false;
+
+ /*
+ Scanner used for checking ground truth to help with read repair.
+ */
+ private Scan externalScan = null;
+ public Scan getExternalScan() { return externalScan; }
Review Comment:
Can u add some comments how this method is to be used by the framework? Do
not see a setter for this attribute? Is it supposed to be overridden by the
implementing classes?
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/ChildLinkScanTask.java:
##########
@@ -0,0 +1,53 @@
+package org.apache.phoenix.coprocessor.tasks;
+
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.task.Task;
+import
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.util.QueryUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSet;
+
+/*
+Task to run a simple select * query on SYSTEM.CHILD_LINK table to trigger read
repair and verify any unverified rows.
+ */
+public class ChildLinkScanTask extends BaseTask {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ChildLinkScanTask.class);
+ private static final String CHILD_LINK_QUERY = "SELECT * FROM
SYSTEM.CHILD_LINK";
+ private static boolean isDisabled = false;
+
+ @VisibleForTesting
+ public static void disableChildLinkScanTask(boolean disable) {
+ isDisabled = disable;
+ }
+
+ @Override
+ public TaskRegionObserver.TaskResult run(Task.TaskRecord taskRecord) {
+
+ if (isDisabled) {
+ return new
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL,
"ChildLinkScan task is disabled.");
+ }
+
+ int count = 0;
+ try {
+ PhoenixConnection pconn =
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class);
Review Comment:
See this code snippet for eg.
https://github.com/apache/phoenix/blob/9a50b02ca6ce5fcca7f1b7975bd22d605ad84dc0/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java#L1871
##########
phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java:
##########
@@ -1333,10 +1334,22 @@ public static Long getPageSizeInMs(ReadOnlyProps props)
{
return null;
}
+ private static void setScanAtrributesForChildLinkRepair(Scan scan, PTable
table, PhoenixConnection phoenixConnection) {
Review Comment:
You can move this to the ChildLinkScanTask.java file. Thus will run the
verification logic only when needed instead of every time there is a query on
the CHILD_LINK table.
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ChildLinkMetaDataEndpoint.java:
##########
@@ -111,4 +145,162 @@ private PhoenixMetaDataCoprocessorHost
getCoprocessorHost() {
return phoenixAccessCoprocessorHost;
}
+ /**
+ * Class that verifies a given row of a SYSTEM.CHILD_LINK table.
+ * An instance of this class is created for each scanner on the table
+ * and used to verify individual rows.
+ */
+ public class ChildLinkMetaDataScanner extends ReadRepairScanner {
+
+ private Table sysCatHTable;
+ private Scan childLinkScan;
+
+ public ChildLinkMetaDataScanner(RegionCoprocessorEnvironment env,
+ Scan scan,
+ RegionScanner scanner) throws
IOException {
+ super(env, scan, scanner);
+ ageThreshold = env.getConfiguration().getLong(
+
QueryServices.CHILD_LINK_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB,
+
QueryServicesOptions.DEFAULT_CHILD_LINK_ROW_AGE_THRESHOLD_TO_DELETE_MS);
+ sysCatHTable = ServerUtil.ConnectionFactory.
+
getConnection(ServerUtil.ConnectionType.DEFAULT_SERVER_CONNECTION, env).
+
getTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME));
+ }
+
+ /*
+ If the row is VERIFIED, remove the empty column from the row
+ */
+ @Override
+ public boolean verifyRow(List<Cell> cellList) {
+ long cellListSize = cellList.size();
+ Cell cell = null;
+ if (cellListSize == 0) {
+ return true;
+ }
+ Iterator<Cell> cellIterator = cellList.iterator();
+ while (cellIterator.hasNext()) {
+ cell = cellIterator.next();
+ if (isEmptyColumn(cell)) {
+ if (Bytes.compareTo(cell.getValueArray(),
cell.getValueOffset(), cell.getValueLength(),
+ VERIFIED_BYTES, 0, VERIFIED_BYTES.length) != 0) {
+ return false;
+ }
+ // Empty column is not supposed to be returned to the
client except
+ // when it is the only column included in the scan
+ if (cellListSize > 1) {
+ cellIterator.remove();
+ }
+ return true;
+ }
+ }
+ // no empty column found
+ return false;
+ }
+
+ /*
+ Find parent link in syscat for given child link.
+ If found, mark child link row VERIFIED and start a new scan from it.
+ Otherwise, delete if row is old enough.
+ */
+ @Override
+ public void repairRow(List<Cell> row) throws IOException {
+ Cell cell = row.get(0);
+ byte[] rowKey = CellUtil.cloneRow(cell);
+ long ts = row.get(0).getTimestamp();
+
+ Scan sysCatScan = getExternalScanner();
+ childLinkScan = new Scan(scan);
+
+
+ // build syscat rowKey using given rowKey
+ byte[] sysCatRowKey = getSysCatRowKey(rowKey);
+
+ // scan syscat to find row
+ sysCatScan.withStartRow(sysCatRowKey, true);
+ sysCatScan.withStopRow(sysCatRowKey, true);
+ sysCatScan.setTimeRange(0, maxTimestamp);
+ Result result = null;
+ try (ResultScanner resultScanner =
sysCatHTable.getScanner(sysCatScan)){
+ result = resultScanner.next();
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(sysCatHTable.getName().toString(),
t);
+ }
+ // if row found, repair and verifyRowAndRemoveEmptyColumn
+ if (result != null && !result.isEmpty()) {
+ markChildLinkVerified(rowKey, ts, region);
+ scanner.close();
+ childLinkScan.withStartRow(rowKey, true);
+ scanner = region.getScanner(childLinkScan);
+ hasMore = true;
+ }
+ // if not, delete if old enough, otherwise ignore
+ else {
+ deleteIfAgedEnough(rowKey, ts, region);
+ if (restartScanDueToPageFilterRemoval) {
+ scanner.close();
+ childLinkScan.withStartRow(rowKey, true);
+ scanner = region.getScanner(childLinkScan);
+ hasMore = true;
+ restartScanDueToPageFilterRemoval = false;
+ }
+ }
+ row.clear();
+ }
+
+ /*
+ Construct row key for SYSTEM.CATALOG from a given SYSTEM.CHILD_LINK
row key
+ SYSTEM.CATALOG -> (CHILD_TENANT_ID, CHILD_SCHEMA, CHILD_TABLE,
PARENT_TENANT_ID, PARENT_FULL_NAME)
+ SYSTEM.CHILD_LINK -> (PARENT_TENANT_ID, PARENT_SCHEMA, PARENT_TABLE,
CHILD_TENANT_ID, CHILD_FULL_NAME)
+ */
+ private byte[] getSysCatRowKey(byte[] childLinkRowKey) {
+ String NULL_DELIMITER = "\0";
+ String[] childLinkRowKeyCols = new String(childLinkRowKey,
StandardCharsets.UTF_8).split(NULL_DELIMITER);
+ checkArgument(childLinkRowKeyCols.length == 5);
+ String parentTenantId = childLinkRowKeyCols[0];
+ String parentSchema = childLinkRowKeyCols[1];
+ String parentTable = childLinkRowKeyCols[2];
+ String childTenantId = childLinkRowKeyCols[3];
+ String childFullName = childLinkRowKeyCols[4];
+
+ String parentFullName = SchemaUtil.getTableName(parentSchema,
parentTable);
+ String childSchema =
SchemaUtil.getSchemaNameFromFullName(childFullName);
+ String childTable =
SchemaUtil.getTableNameFromFullName(childFullName);
+
+ String[] sysCatRowKeyCols = new String[] {childTenantId,
childSchema, childTable, parentTenantId, parentFullName};
+ return String.join(NULL_DELIMITER,
sysCatRowKeyCols).getBytes(StandardCharsets.UTF_8);
+ }
+
+
+ private void deleteIfAgedEnough(byte[] rowKey, long ts, Region region)
throws IOException {
+ if ((EnvironmentEdgeManager.currentTimeMillis() - ts) >
ageThreshold) {
+ Delete del = new Delete(rowKey);
+ Mutation[] mutations = new Mutation[]{del};
+ region.batchMutate(mutations);
+ }
+ }
+
+
+ private void markChildLinkVerified(byte[] rowKey, long ts, Region
region) throws IOException {
+ Put put = new Put(rowKey);
Review Comment:
I think you may need to use the Put(byte[] row, long ts) constructor.
##########
phoenix-core/src/it/java/org/apache/phoenix/end2end/OrphanChildLinkRowsIT.java:
##########
@@ -0,0 +1,201 @@
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.coprocessor.tasks.ChildLinkScanTask;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.ConnectionQueryServicesImpl;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableAlreadyExistsException;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.phoenix.end2end.IndexRebuildTaskIT.waitForTaskState;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
+import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME;
+import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
+import static org.apache.phoenix.query.QueryConstants.VERIFIED_BYTES;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class OrphanChildLinkRowsIT extends BaseTest {
+
+ private static Map<String, String> expectedChildLinks = new HashMap<>();
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+
props.put(QueryServices.CHILD_LINK_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, "0");
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+
+ // Create 2 tables - T1 and T2. Create a view V1 on T1.
+ String t1 = "CREATE TABLE IF NOT EXISTS S1.T1 (TENANT_ID VARCHAR NOT
NULL, A INTEGER NOT NULL, B INTEGER CONSTRAINT PK PRIMARY KEY (TENANT_ID, A))";
+ String t2 = "CREATE TABLE IF NOT EXISTS S2.T2 (TENANT_ID VARCHAR NOT
NULL, A INTEGER NOT NULL, B INTEGER CONSTRAINT PK PRIMARY KEY (TENANT_ID, A))";
+ String v1 = "CREATE VIEW IF NOT EXISTS VS1.V1 (NEW_COL1 INTEGER,
NEW_COL2 INTEGER) AS SELECT * FROM S1.T1 WHERE B > 10";
+
+ try (Connection connection = DriverManager.getConnection(getUrl())) {
+ connection.createStatement().execute(t1);
+ connection.createStatement().execute(t2);
+ connection.createStatement().execute(v1);
+ }
+
+ expectedChildLinks.put("S1.T1", "VS1.V1");
+ }
+
+ /**
+ * 1. Disable the child link scan task.
+ * 2. Create a view (same name as existing view on T1) on T2. This CREATE
VIEW will fail, verify if there was no orphan child link because of that.
+ *
+ * 3. Instrument CQSI to fail phase three of CREATE VIEW. Create a new
view V2 on T2 (passes) and V1 on T2 which will fail.
+ * Both links T2->V2 and T2->V1 will be in UNVERIFIED state, repaired
during read.
+ * Check if only 2 child links are returned: T2->V2 and T1->V1.
+ */
+ @Test
+ public void testNoOrphanChildLinkRow() throws Exception {
+
+
ConnectionQueryServicesImpl.setFailPhaseThreeChildLinkWriteForTesting(false);
+ ChildLinkScanTask.disableChildLinkScanTask(true);
+
+ String v2 = "CREATE VIEW VS1.V1 (NEW_COL1 INTEGER, NEW_COL2 INTEGER)
AS SELECT * FROM S2.T2 WHERE B > 10";
+
+ try (Connection connection = DriverManager.getConnection(getUrl())) {
+ connection.createStatement().execute(v2);
+ }
+ catch (TableAlreadyExistsException e) {
Review Comment:
I see so we want to swallow the exception.
> Ensure consistency between SYSTEM.CATALOG and SYSTEM.CHILD_LINK
> ---------------------------------------------------------------
>
> Key: PHOENIX-6141
> URL: https://issues.apache.org/jira/browse/PHOENIX-6141
> Project: Phoenix
> Issue Type: Improvement
> Affects Versions: 5.0.0, 4.15.0
> Reporter: Chinmay Kulkarni
> Assignee: Palash Chauhan
> Priority: Blocker
> Fix For: 5.2.0, 5.1.4
>
>
> Before 4.15, "CREATE/DROP VIEW" was an atomic operation since we were issuing
> batch mutations on just the 1 SYSTEM.CATALOG region. In 4.15 we introduced
> SYSTEM.CHILD_LINK to store the parent->child links and so a CREATE VIEW is no
> longer atomic since it consists of 2 separate RPCs (1 to SYSTEM.CHILD_LINK
> to add the linking row and another to SYSTEM.CATALOG to write metadata for
> the new view).
> If the second RPC i.e. the RPC to write metadata to SYSTEM.CATALOG fails
> after the 1st RPC has already gone through, there will be an inconsistency
> between both metadata tables. We will see orphan parent->child linking rows
> in SYSTEM.CHILD_LINK in this case. This can cause the following issues:
> # ALTER TABLE calls on the base table will fail
> # DROP TABLE without CASCADE will fail
> # The upgrade path has calls like UpgradeUtil.upgradeTable() which will fail
> # Any metadata consistency checks can be thrown off
> # Unnecessary extra storage of orphan links
> The first 3 issues happen because we wrongly deduce that a base table has
> child views due to the orphan linking rows.
> This Jira aims at trying to come up with a way to make mutations among
> SYSTEM.CATALOG and SYSTEM.CHILD_LINK an atomic transaction. We can use a
> 2-phase commit approach like in global indexing or also potentially explore
> using a transaction manager.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)