[ 
https://issues.apache.org/jira/browse/PHOENIX-6888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17707438#comment-17707438
 ] 

ASF GitHub Bot commented on PHOENIX-6888:
-----------------------------------------

jpisaac commented on code in PR #1569:
URL: https://github.com/apache/phoenix/pull/1569#discussion_r1154801675


##########
phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java:
##########
@@ -165,7 +165,7 @@ public void testBytesRowsForSelectWithLimitIgnored() throws 
Exception {
             ResultSet rs = conn.createStatement().executeQuery(sql);
             assertFalse(rs.next());
             Estimate info = getByteRowEstimates(conn, sql, binds);
-            assertEquals((Long) 390L, info.estimatedBytes);
+            assertEquals((Long) 691L, info.estimatedBytes);

Review Comment:
   What caused the increased estimates?



##########
phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java:
##########
@@ -68,10 +70,14 @@
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @Category(NeedsOwnMiniClusterTest.class)
 @RunWith(Parameterized.class)
 public class GlobalIndexCheckerIT extends BaseTest {
+    private static final Logger LOG =

Review Comment:
   nit: Remove unused variables



##########
phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java:
##########
@@ -204,7 +204,7 @@ public void testBytesRowsForSelectOnIndex() throws 
Exception {
                 assertTrue(rs.next());
             }
             Estimate info = getByteRowEstimates(conn, sql, binds);
-            assertEquals((Long) 390L, info.estimatedBytes);
+            assertEquals((Long) 691L, info.estimatedBytes);

Review Comment:
   same here



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java:
##########
@@ -0,0 +1,908 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeepDeletedCells;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.phoenix.query.QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX;
+
+/**
+ * The store scanner that implements Phoenix TTL and Max Lookback. Phoenix 
overrides the
+ * HBase implementation of data retention policies which is built at the cell 
level, and implements
+ * its row level data retention within this store scanner.
+ */
+public class CompactionScanner implements InternalScanner {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CompactionScanner.class);
+    public static final String SEPARATOR = ":";
+    private final InternalScanner storeScanner;
+    private final Region region;
+    private final Store store;
+    private final Configuration config;
+    private final RegionCoprocessorEnvironment env;
+    private long maxLookbackWindowStart;
+    private long ttlWindowStart;
+    private long ttl;
+    private final long maxLookbackInMillis;
+    private int minVersion;
+    private int maxVersion;
+    private final boolean emptyCFStore;
+    private KeepDeletedCells keepDeletedCells;
+    private long compactionTime;
+    private final byte[] emptyCF;
+    private final byte[] emptyCQ;
+    private static Map<String, Long> maxLookbackMap = new 
ConcurrentHashMap<>();
+    private PhoenixLevelRowCompactor phoenixLevelRowCompactor;
+    private HBaseLevelRowCompactor hBaseLevelRowCompactor;
+
+    public CompactionScanner(RegionCoprocessorEnvironment env,
+            Store store,
+            InternalScanner storeScanner,
+            long maxLookbackInMillis,
+            byte[] emptyCF,
+            byte[] emptyCQ) {
+        this.storeScanner = storeScanner;
+        this.region = env.getRegion();
+        this.store = store;
+        this.env = env;
+        this.emptyCF = emptyCF;
+        this.emptyCQ = emptyCQ;
+        this.config = env.getConfiguration();
+        compactionTime = EnvironmentEdgeManager.currentTimeMillis();
+        this.maxLookbackInMillis = maxLookbackInMillis;
+        String columnFamilyName = store.getColumnFamilyName();
+        String tableName = region.getRegionInfo().getTable().getNameAsString();
+        Long overriddenMaxLookback =
+                maxLookbackMap.remove(tableName + SEPARATOR + 
columnFamilyName);
+        this.maxLookbackWindowStart = compactionTime - (overriddenMaxLookback 
== null ?
+                maxLookbackInMillis : Math.max(maxLookbackInMillis, 
overriddenMaxLookback)) - 1;
+        ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
+        ttl = cfd.getTimeToLive();
+        this.ttlWindowStart = ttl == HConstants.FOREVER ? 1 : compactionTime - 
ttl * 1000;
+        ttl *= 1000;
+        this.maxLookbackWindowStart = Math.max(ttlWindowStart, 
maxLookbackWindowStart);
+        this.minVersion = cfd.getMinVersions();
+        this.maxVersion = cfd.getMaxVersions();
+        this.keepDeletedCells = cfd.getKeepDeletedCells();
+        emptyCFStore = region.getTableDescriptor().getColumnFamilies().length 
== 1 ||
+                columnFamilyName.equals(Bytes.toString(emptyCF)) ||
+                columnFamilyName.startsWith(LOCAL_INDEX_COLUMN_FAMILY_PREFIX);
+        phoenixLevelRowCompactor = new PhoenixLevelRowCompactor();
+        hBaseLevelRowCompactor = new HBaseLevelRowCompactor();
+    }
+
+    /**
+     * Any coprocessors within a JVM can extend the max lookback window for a 
column family
+     * by calling this static method.
+     */
+    public static void overrideMaxLookback(String tableName, String 
columnFamilyName,
+            long maxLookbackInMillis) {
+        if (tableName == null || columnFamilyName == null) {
+            return;
+        }
+        Long old = maxLookbackMap.putIfAbsent(tableName + SEPARATOR + 
columnFamilyName,
+                maxLookbackInMillis);
+        if (old != null && old < maxLookbackInMillis) {
+            maxLookbackMap.put(tableName + SEPARATOR + columnFamilyName, 
maxLookbackInMillis);
+        }
+    }
+
+    @Override
+    public boolean next(List<Cell> result) throws IOException {
+        boolean hasMore = storeScanner.next(result);
+        if (!result.isEmpty()) {
+            phoenixLevelRowCompactor.compact(result, false);
+        }
+        return hasMore;
+    }
+
+    @Override
+    public boolean next(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
+        return next(result);
+    }
+
+    @Override
+    public void close() throws IOException {
+        storeScanner.close();
+    }
+
+    /**
+     * The context for a given row during compaction. A row may have multiple 
compaction row
+     * versions. CompactionScanner uses the same row context for these 
versions.
+     */
+    class RowContext {
+        Cell familyDeleteMarker = null;
+        Cell familyVersionDeleteMarker = null;
+        List<Cell> columnDeleteMarkers = null;
+        int version = 0;
+        long maxTimestamp;
+        long minTimestamp;
+        private void addColumnDeleteMarker(Cell deleteMarker) {
+            if (columnDeleteMarkers == null) {
+                columnDeleteMarkers = new ArrayList<>();
+            }
+            columnDeleteMarkers.add(deleteMarker);
+        }
+    }
+
+    /**
+     * This method finds out the maximum and minimum timestamp of the cells of 
the next row
+     * version.
+     *
+     * @param columns
+     * @param rowContext
+     */
+    private void getNextRowVersionTimestamp(LinkedList<LinkedList<Cell>> 
columns,
+            RowContext rowContext) {
+        rowContext.maxTimestamp = 0;
+        rowContext.minTimestamp = Long.MAX_VALUE;
+        long ts;
+        long currentDeleteFamilyTimestamp = 0;
+        long nextDeleteFamilyTimestamp = 0;
+        boolean firstColumn = true;
+        for (LinkedList<Cell> column : columns) {
+            Cell firstCell = column.getFirst();
+            ts = firstCell.getTimestamp();
+            if (ts <= nextDeleteFamilyTimestamp) {
+                continue;
+            }
+            if (firstCell.getType() == Cell.Type.DeleteFamily ||
+                    firstCell.getType() == Cell.Type.DeleteFamilyVersion) {
+                if (firstColumn) {
+                    // Family delete markers are always found in the first 
column of a column family
+                    // When Phoenix deletes a row, it places a family delete 
marker in each column
+                    // family with the same timestamp. We just need to process 
the delete column
+                    // family markers of the first column family, which would 
be in the column
+                    // with columnIndex=0.
+                    currentDeleteFamilyTimestamp = firstCell.getTimestamp();
+                    // We need to check if the next delete family marker 
exits. If so, we need
+                    // to record its timestamp as by definition a compaction 
row version cannot
+                    // cross a family delete marker
+                    if (column.size() > 1) {
+                        nextDeleteFamilyTimestamp = 
column.get(1).getTimestamp();
+                    } else {
+                        nextDeleteFamilyTimestamp = 0;
+                    }
+                }
+            } if (firstCell.getType() == Cell.Type.Put) {

Review Comment:
   Not sure whether this is a typo wanted elseif or wanted an if on a separate 
line. Logically the two separate if's stmts are correct since they cannot be 
true at the same time.



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME;
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME;
+
+/**
+ *  TTLRegionScanner masks expired rows using the empty column cell timestamp
+ */
+public class TTLRegionScanner extends BaseRegionScanner {
+    private static final Logger LOG =
+            LoggerFactory.getLogger(TTLRegionScanner.class);
+    private final boolean isPhoenixTableTTLEnabled;
+    private final RegionCoprocessorEnvironment env;
+    private Scan scan;
+    private long rowCount = 0;
+    private long pageSize = Long.MAX_VALUE;
+    long ttl;
+    long ttlWindowStart;
+    byte[] emptyCQ;
+    byte[] emptyCF;
+    private boolean initialized = false;
+
+    public TTLRegionScanner(final RegionCoprocessorEnvironment env, final Scan 
scan,
+            final RegionScanner s) {
+        super(s);
+        this.env = env;
+        this.scan = scan;
+        emptyCQ = scan.getAttribute(EMPTY_COLUMN_QUALIFIER_NAME);
+        emptyCF = scan.getAttribute(EMPTY_COLUMN_FAMILY_NAME);
+        long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+        ttl = 
env.getRegion().getTableDescriptor().getColumnFamilies()[0].getTimeToLive();
+        ttlWindowStart = ttl == HConstants.FOREVER ? 1 : currentTime - ttl * 
1000;
+        ttl *= 1000;
+        isPhoenixTableTTLEnabled = emptyCF != null && emptyCQ != null &&

Review Comment:
   Logic for isPhoenixTableTTLEnabled is different here than in other classes.
   I am assuming the scan attributes being set or not is to check whether the 
client wants the rows masked or w/o masking/raw?
   
   Shouldn't we separate the 2 concerns?  PHOENIX_TABLE_TTL_ENABLED AND the 
client enabling it?
   
   AND maybe add more comments explaining the various cases





> Fixing TTL and Max Lookback Issues for Phoenix Tables
> -----------------------------------------------------
>
>                 Key: PHOENIX-6888
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-6888
>             Project: Phoenix
>          Issue Type: Bug
>    Affects Versions: 5.1.3
>            Reporter: Kadir Ozdemir
>            Assignee: Kadir Ozdemir
>            Priority: Major
>
> In HBase, the unit of data is a cell and data retention rules are executed at 
> the cell level. These rules are defined at the column family level. Phoenix 
> leverages the data retention features of HBase and exposes them to its users 
> to provide its TTL feature at the table level. However, these rules (since 
> they are defined at the cell level instead of the row level) results in 
> partial row retention that in turn creates data integrity issues at the 
> Phoenix level. 
> Similarly, Phoenix’s max lookback feature leverages HBase deleted data 
> retention capabilities to preserve deleted cells within a configurable max 
> lookback. This requires two data retention windows, max lookback and TTL. One 
> end of these windows is the current time and the end is a moment in the past 
> (i.e., current time minus the window size). Typically, the max lookback 
> window is shorter than the TTL window. In the max lookback window, we would 
> like to preserve the complete history of mutations regardless of how many 
> cell versions these mutations generated. In the remaining TTL window outside 
> the max lookback, we would like to apply the data retention rules defined 
> above. However, HBase provides only one data retention window. Thus, the max 
> lookback window had to be extended to become TTL window and the max lookback 
> feature results in unwantedly retaining deleted data for the maximum of max 
> lookback and TTL periods. 
> This Jira is to fix both of these issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to