[ 
https://issues.apache.org/jira/browse/PHOENIX-6141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17719567#comment-17719567
 ] 

ASF GitHub Bot commented on PHOENIX-6141:
-----------------------------------------

jpisaac commented on code in PR #1575:
URL: https://github.com/apache/phoenix/pull/1575#discussion_r1185605456


##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ReadRepairScanner.java:
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.filter.PagedFilter;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME;
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME;
+import static org.apache.phoenix.util.ScanUtil.getDummyResult;
+import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForRegionScanner;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
+
+public abstract class ReadRepairScanner extends BaseRegionScanner {
+
+    public Logger LOGGER;
+    public RegionScanner scanner;
+    public Scan scan;
+    public RegionCoprocessorEnvironment env;
+    public byte[] emptyCF;
+    public byte[] emptyCQ;
+    public Region region;
+    public boolean hasMore;
+    public long pageSizeMs;
+    public long pageSize = Long.MAX_VALUE;
+    public long rowCount = 0;
+    public long maxTimestamp;
+    public long ageThreshold;
+    public boolean restartScanDueToPageFilterRemoval = false;
+
+    /*
+    Scanner used for checking ground truth to help with read repair.
+     */
+    private Scan externalScan = null;
+    public Scan getExternalScan() { return externalScan; }

Review Comment:
   Can u add some comments how this method is to be used by the framework?  Do 
not see a setter for this attribute? Is it supposed to be overridden by the 
implementing classes?



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/ChildLinkScanTask.java:
##########
@@ -0,0 +1,53 @@
+package org.apache.phoenix.coprocessor.tasks;
+
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.task.Task;
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.util.QueryUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSet;
+
+/*
+Task to run a simple select * query on SYSTEM.CHILD_LINK table to trigger read 
repair and verify any unverified rows.
+ */
+public class ChildLinkScanTask extends BaseTask {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ChildLinkScanTask.class);
+    private static final String CHILD_LINK_QUERY = "SELECT * FROM 
SYSTEM.CHILD_LINK";
+    private static boolean isDisabled = false;
+
+    @VisibleForTesting
+    public static void disableChildLinkScanTask(boolean disable) {
+        isDisabled = disable;
+    }
+
+    @Override
+    public TaskRegionObserver.TaskResult run(Task.TaskRecord taskRecord) {
+
+        if (isDisabled) {
+            return new 
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, 
"ChildLinkScan task is disabled.");
+        }
+
+        int count = 0;
+        try {
+            PhoenixConnection pconn = 
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class);

Review Comment:
   See this code snippet for eg. 
https://github.com/apache/phoenix/blob/9a50b02ca6ce5fcca7f1b7975bd22d605ad84dc0/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java#L1871



##########
phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java:
##########
@@ -1333,10 +1334,22 @@ public static Long getPageSizeInMs(ReadOnlyProps props) 
{
         return null;
     }
 
+    private static void setScanAtrributesForChildLinkRepair(Scan scan, PTable 
table, PhoenixConnection phoenixConnection) {

Review Comment:
   You can move this to the ChildLinkScanTask.java file. Thus will run the 
verification logic only when needed instead of every time there is a query on 
the CHILD_LINK table.



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ChildLinkMetaDataEndpoint.java:
##########
@@ -111,4 +145,162 @@ private PhoenixMetaDataCoprocessorHost 
getCoprocessorHost() {
                return phoenixAccessCoprocessorHost;
        }
 
+    /**
+     * Class that verifies a given row of a SYSTEM.CHILD_LINK table.
+     * An instance of this class is created for each scanner on the table
+     * and used to verify individual rows.
+     */
+    public class ChildLinkMetaDataScanner extends ReadRepairScanner {
+
+        private Table sysCatHTable;
+        private Scan childLinkScan;
+
+        public ChildLinkMetaDataScanner(RegionCoprocessorEnvironment env,
+                                        Scan scan,
+                                        RegionScanner scanner) throws 
IOException {
+            super(env, scan, scanner);
+            ageThreshold = env.getConfiguration().getLong(
+                    
QueryServices.CHILD_LINK_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB,
+                    
QueryServicesOptions.DEFAULT_CHILD_LINK_ROW_AGE_THRESHOLD_TO_DELETE_MS);
+            sysCatHTable = ServerUtil.ConnectionFactory.
+                    
getConnection(ServerUtil.ConnectionType.DEFAULT_SERVER_CONNECTION, env).
+                    
getTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME));
+        }
+
+        /*
+        If the row is VERIFIED, remove the empty column from the row
+         */
+        @Override
+        public boolean verifyRow(List<Cell> cellList) {
+            long cellListSize = cellList.size();
+            Cell cell = null;
+            if (cellListSize == 0) {
+                return true;
+            }
+            Iterator<Cell> cellIterator = cellList.iterator();
+            while (cellIterator.hasNext()) {
+                cell = cellIterator.next();
+                if (isEmptyColumn(cell)) {
+                    if (Bytes.compareTo(cell.getValueArray(), 
cell.getValueOffset(), cell.getValueLength(),
+                            VERIFIED_BYTES, 0, VERIFIED_BYTES.length) != 0) {
+                        return false;
+                    }
+                    // Empty column is not supposed to be returned to the 
client except
+                    // when it is the only column included in the scan
+                    if (cellListSize > 1) {
+                        cellIterator.remove();
+                    }
+                    return true;
+                }
+            }
+            // no empty column found
+            return false;
+        }
+
+        /*
+        Find parent link in syscat for given child link.
+        If found, mark child link row VERIFIED and start a new scan from it.
+        Otherwise, delete if row is old enough.
+         */
+        @Override
+        public void repairRow(List<Cell> row) throws IOException {
+            Cell cell = row.get(0);
+            byte[] rowKey = CellUtil.cloneRow(cell);
+            long ts = row.get(0).getTimestamp();
+
+            Scan sysCatScan = getExternalScanner();
+            childLinkScan = new Scan(scan);
+
+
+            // build syscat rowKey using given rowKey
+            byte[] sysCatRowKey = getSysCatRowKey(rowKey);
+
+            // scan syscat to find row
+            sysCatScan.withStartRow(sysCatRowKey, true);
+            sysCatScan.withStopRow(sysCatRowKey, true);
+            sysCatScan.setTimeRange(0, maxTimestamp);
+            Result result = null;
+            try (ResultScanner resultScanner = 
sysCatHTable.getScanner(sysCatScan)){
+                result = resultScanner.next();
+            } catch (Throwable t) {
+                ServerUtil.throwIOException(sysCatHTable.getName().toString(), 
t);
+            }
+            // if row found, repair and verifyRowAndRemoveEmptyColumn
+            if (result != null && !result.isEmpty()) {
+                markChildLinkVerified(rowKey, ts, region);
+                scanner.close();
+                childLinkScan.withStartRow(rowKey, true);
+                scanner = region.getScanner(childLinkScan);
+                hasMore = true;
+            }
+            // if not, delete if old enough, otherwise ignore
+            else {
+                deleteIfAgedEnough(rowKey, ts, region);
+                if (restartScanDueToPageFilterRemoval) {
+                    scanner.close();
+                    childLinkScan.withStartRow(rowKey, true);
+                    scanner = region.getScanner(childLinkScan);
+                    hasMore = true;
+                    restartScanDueToPageFilterRemoval = false;
+                }
+            }
+            row.clear();
+        }
+
+        /*
+        Construct row key for SYSTEM.CATALOG from a given SYSTEM.CHILD_LINK 
row key
+        SYSTEM.CATALOG -> (CHILD_TENANT_ID, CHILD_SCHEMA, CHILD_TABLE, 
PARENT_TENANT_ID, PARENT_FULL_NAME)
+        SYSTEM.CHILD_LINK -> (PARENT_TENANT_ID, PARENT_SCHEMA, PARENT_TABLE, 
CHILD_TENANT_ID, CHILD_FULL_NAME)
+         */
+        private byte[] getSysCatRowKey(byte[] childLinkRowKey) {
+            String NULL_DELIMITER = "\0";
+            String[] childLinkRowKeyCols = new String(childLinkRowKey, 
StandardCharsets.UTF_8).split(NULL_DELIMITER);
+            checkArgument(childLinkRowKeyCols.length == 5);
+            String parentTenantId = childLinkRowKeyCols[0];
+            String parentSchema = childLinkRowKeyCols[1];
+            String parentTable = childLinkRowKeyCols[2];
+            String childTenantId = childLinkRowKeyCols[3];
+            String childFullName = childLinkRowKeyCols[4];
+
+            String parentFullName = SchemaUtil.getTableName(parentSchema, 
parentTable);
+            String childSchema = 
SchemaUtil.getSchemaNameFromFullName(childFullName);
+            String childTable = 
SchemaUtil.getTableNameFromFullName(childFullName);
+
+            String[] sysCatRowKeyCols = new String[] {childTenantId, 
childSchema, childTable, parentTenantId, parentFullName};
+            return String.join(NULL_DELIMITER, 
sysCatRowKeyCols).getBytes(StandardCharsets.UTF_8);
+        }
+
+
+        private void deleteIfAgedEnough(byte[] rowKey, long ts, Region region) 
throws IOException {
+            if ((EnvironmentEdgeManager.currentTimeMillis() - ts) > 
ageThreshold) {
+                Delete del = new Delete(rowKey);
+                Mutation[] mutations = new Mutation[]{del};
+                region.batchMutate(mutations);
+            }
+        }
+
+
+        private void markChildLinkVerified(byte[] rowKey, long ts, Region 
region) throws IOException {
+            Put put = new Put(rowKey);

Review Comment:
   I think you may need to use the Put(byte[] row, long ts) constructor. 



##########
phoenix-core/src/it/java/org/apache/phoenix/end2end/OrphanChildLinkRowsIT.java:
##########
@@ -0,0 +1,201 @@
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.coprocessor.tasks.ChildLinkScanTask;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.ConnectionQueryServicesImpl;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableAlreadyExistsException;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.phoenix.end2end.IndexRebuildTaskIT.waitForTaskState;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
+import static org.apache.phoenix.query.QueryConstants.VERIFIED_BYTES;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class OrphanChildLinkRowsIT extends BaseTest {
+
+    private static Map<String, String> expectedChildLinks = new HashMap<>();
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        
props.put(QueryServices.CHILD_LINK_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, "0");
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+
+        // Create 2 tables - T1 and T2. Create a view V1 on T1.
+        String t1 = "CREATE TABLE IF NOT EXISTS S1.T1 (TENANT_ID VARCHAR NOT 
NULL, A INTEGER NOT NULL, B INTEGER CONSTRAINT PK PRIMARY KEY (TENANT_ID, A))";
+        String t2 = "CREATE TABLE IF NOT EXISTS S2.T2 (TENANT_ID VARCHAR NOT 
NULL, A INTEGER NOT NULL, B INTEGER CONSTRAINT PK PRIMARY KEY (TENANT_ID, A))";
+        String v1 = "CREATE VIEW IF NOT EXISTS VS1.V1 (NEW_COL1 INTEGER, 
NEW_COL2 INTEGER) AS SELECT * FROM S1.T1 WHERE B > 10";
+
+        try (Connection connection = DriverManager.getConnection(getUrl())) {
+            connection.createStatement().execute(t1);
+            connection.createStatement().execute(t2);
+            connection.createStatement().execute(v1);
+        }
+
+        expectedChildLinks.put("S1.T1", "VS1.V1");
+    }
+
+    /**
+     * 1. Disable the child link scan task.
+     * 2. Create a view (same name as existing view on T1) on T2. This CREATE 
VIEW will fail, verify if there was no orphan child link because of that.
+     *
+     * 3. Instrument CQSI to fail phase three of CREATE VIEW. Create a new 
view V2 on T2 (passes) and V1 on T2 which will fail.
+     *    Both links T2->V2 and T2->V1 will be in UNVERIFIED state, repaired 
during read.
+     *    Check if only 2 child links are returned: T2->V2 and T1->V1.
+     */
+    @Test
+    public void testNoOrphanChildLinkRow() throws Exception {
+
+        
ConnectionQueryServicesImpl.setFailPhaseThreeChildLinkWriteForTesting(false);
+        ChildLinkScanTask.disableChildLinkScanTask(true);
+
+        String v2 = "CREATE VIEW VS1.V1 (NEW_COL1 INTEGER, NEW_COL2 INTEGER) 
AS SELECT * FROM S2.T2 WHERE B > 10";
+
+        try (Connection connection = DriverManager.getConnection(getUrl())) {
+            connection.createStatement().execute(v2);
+        }
+        catch (TableAlreadyExistsException e) {

Review Comment:
   I see so we want to swallow the exception.





> Ensure consistency between SYSTEM.CATALOG and SYSTEM.CHILD_LINK
> ---------------------------------------------------------------
>
>                 Key: PHOENIX-6141
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-6141
>             Project: Phoenix
>          Issue Type: Improvement
>    Affects Versions: 5.0.0, 4.15.0
>            Reporter: Chinmay Kulkarni
>            Assignee: Palash Chauhan
>            Priority: Blocker
>             Fix For: 5.2.0, 5.1.4
>
>
> Before 4.15, "CREATE/DROP VIEW" was an atomic operation since we were issuing 
> batch mutations on just the 1 SYSTEM.CATALOG region. In 4.15 we introduced 
> SYSTEM.CHILD_LINK to store the parent->child links and so a CREATE VIEW is no 
> longer atomic since it consists of 2 separate RPCs  (1 to SYSTEM.CHILD_LINK 
> to add the linking row and another to SYSTEM.CATALOG to write metadata for 
> the new view). 
> If the second RPC i.e. the RPC to write metadata to SYSTEM.CATALOG fails 
> after the 1st RPC has already gone through, there will be an inconsistency 
> between both metadata tables. We will see orphan parent->child linking rows 
> in SYSTEM.CHILD_LINK in this case. This can cause the following issues:
> # ALTER TABLE calls on the base table will fail
> # DROP TABLE without CASCADE will fail
> # The upgrade path has calls like UpgradeUtil.upgradeTable() which will fail
> # Any metadata consistency checks can be thrown off
> # Unnecessary extra storage of orphan links
> The first 3 issues happen because we wrongly deduce that a base table has 
> child views due to the orphan linking rows.
> This Jira aims at trying to come up with a way to make mutations among 
> SYSTEM.CATALOG and SYSTEM.CHILD_LINK an atomic transaction. We can use a 
> 2-phase commit approach like in global indexing or also potentially explore 
> using a transaction manager. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to