[ 
https://issues.apache.org/jira/browse/PHOENIX-6141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17736341#comment-17736341
 ] 

ASF GitHub Bot commented on PHOENIX-6141:
-----------------------------------------

palashc commented on code in PR #1575:
URL: https://github.com/apache/phoenix/pull/1575#discussion_r1239276896


##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ReadRepairScanner.java:
##########
@@ -0,0 +1,196 @@
+package org.apache.phoenix.coprocessor;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.filter.PagedFilter;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME;
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME;
+import static org.apache.phoenix.util.ScanUtil.getDummyResult;
+import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForRegionScanner;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
+
+public abstract class ReadRepairScanner extends BaseRegionScanner {
+
+    public Logger LOGGER;
+    public RegionScanner scanner;
+    public Scan scan;
+    public RegionCoprocessorEnvironment env;
+    public byte[] emptyCF;
+    public byte[] emptyCQ;
+    public Region region;
+    public boolean hasMore;
+    public long pageSizeMs;
+    public long pageSize = Long.MAX_VALUE;
+    public long rowCount = 0;
+    public long maxTimestamp;
+    public long ageThreshold;
+    public boolean restartScanDueToPageFilterRemoval = false;
+
+    /*
+    Scanner used for checking ground truth to help with read repair.
+     */
+    private Scan externalScanner = null;
+    public Scan getExternalScanner() { return externalScanner; }
+
+    public ReadRepairScanner(RegionCoprocessorEnvironment env, Scan scan, 
RegionScanner scanner) {
+        super(scanner);
+        LOGGER = LoggerFactory.getLogger(this.getClass());
+        this.env = env;
+        this.scan = scan;
+        this.scanner = scanner;
+        region = env.getRegion();
+        emptyCF = scan.getAttribute(EMPTY_COLUMN_FAMILY_NAME);
+        emptyCQ = scan.getAttribute(EMPTY_COLUMN_QUALIFIER_NAME);
+        pageSizeMs = getPageSizeMsForRegionScanner(scan);
+        maxTimestamp = scan.getTimeRange().getMax();
+    }
+
+
+    /*
+    Method which checks whether a row is VERIFIED (i.e. does not need repair).
+     */
+    abstract boolean verifyRow(List<Cell> row);
+
+    /*
+    Method which repairs the given row
+     */
+    abstract void repairRow(List<Cell> row) throws IOException;
+
+    public boolean next(List<Cell> result, boolean raw) throws IOException {
+        try {
+            long startTime = EnvironmentEdgeManager.currentTimeMillis();
+            do {
+                if (raw) {
+                    hasMore = scanner.nextRaw(result);
+                } else {
+                    hasMore = scanner.next(result);
+                }
+                if (result.isEmpty()) {
+                    return hasMore;
+                }
+                if (isDummy(result)) {
+                    return true;
+                }
+                Cell cell = result.get(0);
+                if (verifyRowAndRepairIfNecessary(result)) {
+                    break;
+                }
+                if (hasMore && (EnvironmentEdgeManager.currentTimeMillis() - 
startTime) >= pageSizeMs) {
+                    byte[] rowKey = CellUtil.cloneRow(cell);
+                    result.clear();
+                    getDummyResult(rowKey, result);
+                    return true;
+                }
+                // skip this row as it is invalid
+                // if there is no more row, then result will be an empty list
+            } while (hasMore);
+            rowCount++;
+            if (rowCount == pageSize) {
+                return false;
+            }
+            return hasMore;
+        } catch (Throwable t) {
+            
ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
+            return false; // impossible
+        }
+    }
+
+    @Override
+    public boolean next(List<Cell> result) throws IOException {
+        return next(result, false);
+    }
+
+    @Override
+    public boolean nextRaw(List<Cell> result) throws IOException {
+        return next(result, true);
+    }
+
+    private boolean verifyRowAndRepairIfNecessary(List<Cell> cellList) throws 
IOException {
+        // check if row is VERIFIED
+        if (verifyRow(cellList)) {
+            return true;
+        }
+        else {
+            try {
+                if (externalScanner == null) {
+                    PageFilter pageFilter = removePageFilter(scan);
+                    if (pageFilter != null) {
+                        pageSize = pageFilter.getPageSize();
+                        restartScanDueToPageFilterRemoval = true;
+                    }
+                    externalScanner = new Scan();
+                }
+                repairRow(cellList);
+            } catch (IOException e) {
+                LOGGER.warn("Row Repair failure on region {}.", 
env.getRegionInfo().getRegionNameAsString());
+                throw e;
+            }
+
+            if (cellList.isEmpty()) {
+                return false;
+            }
+            return true;
+        }
+    }
+
+    private PageFilter removePageFilterFromFilterList(FilterList filterList) {

Review Comment:
   Refactored page filter removal based on GlobalIndexChecker





> Ensure consistency between SYSTEM.CATALOG and SYSTEM.CHILD_LINK
> ---------------------------------------------------------------
>
>                 Key: PHOENIX-6141
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-6141
>             Project: Phoenix
>          Issue Type: Improvement
>    Affects Versions: 5.0.0, 4.15.0
>            Reporter: Chinmay Kulkarni
>            Assignee: Palash Chauhan
>            Priority: Blocker
>             Fix For: 5.2.0, 5.1.4
>
>
> Before 4.15, "CREATE/DROP VIEW" was an atomic operation since we were issuing 
> batch mutations on just the 1 SYSTEM.CATALOG region. In 4.15 we introduced 
> SYSTEM.CHILD_LINK to store the parent->child links and so a CREATE VIEW is no 
> longer atomic since it consists of 2 separate RPCs  (1 to SYSTEM.CHILD_LINK 
> to add the linking row and another to SYSTEM.CATALOG to write metadata for 
> the new view). 
> If the second RPC i.e. the RPC to write metadata to SYSTEM.CATALOG fails 
> after the 1st RPC has already gone through, there will be an inconsistency 
> between both metadata tables. We will see orphan parent->child linking rows 
> in SYSTEM.CHILD_LINK in this case. This can cause the following issues:
> # ALTER TABLE calls on the base table will fail
> # DROP TABLE without CASCADE will fail
> # The upgrade path has calls like UpgradeUtil.upgradeTable() which will fail
> # Any metadata consistency checks can be thrown off
> # Unnecessary extra storage of orphan links
> The first 3 issues happen because we wrongly deduce that a base table has 
> child views due to the orphan linking rows.
> This Jira aims at trying to come up with a way to make mutations among 
> SYSTEM.CATALOG and SYSTEM.CHILD_LINK an atomic transaction. We can use a 
> 2-phase commit approach like in global indexing or also potentially explore 
> using a transaction manager. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to