[ 
https://issues.apache.org/jira/browse/PHOENIX-6888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708618#comment-17708618
 ] 

ASF GitHub Bot commented on PHOENIX-6888:
-----------------------------------------

jpisaac commented on code in PR #1569:
URL: https://github.com/apache/phoenix/pull/1569#discussion_r1157848028


##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME;
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME;
+
+/**
+ *  TTLRegionScanner masks expired rows using the empty column cell timestamp
+ */
+public class TTLRegionScanner extends BaseRegionScanner {
+    private static final Logger LOG =
+            LoggerFactory.getLogger(TTLRegionScanner.class);
+    private final boolean isMaskingEnabled;
+    private final RegionCoprocessorEnvironment env;
+    private Scan scan;
+    private long rowCount = 0;
+    private long pageSize = Long.MAX_VALUE;
+    long ttl;
+    long ttlWindowStart;
+    byte[] emptyCQ;
+    byte[] emptyCF;
+    private boolean initialized = false;
+
+    public TTLRegionScanner(final RegionCoprocessorEnvironment env, final Scan 
scan,
+            final RegionScanner s) {
+        super(s);
+        this.env = env;
+        this.scan = scan;
+        emptyCQ = scan.getAttribute(EMPTY_COLUMN_QUALIFIER_NAME);
+        emptyCF = scan.getAttribute(EMPTY_COLUMN_FAMILY_NAME);
+        long currentTime = EnvironmentEdgeManager.currentTimeMillis();

Review Comment:
   Shouldn't the current time be SCN time if one is set? (i.e if it is a SCN 
query)



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java:
##########
@@ -87,4 +88,8 @@ public int getBatch() {
     public RegionInfo getRegionInfo() {
         return delegate.getRegionInfo();
     }
+
+    public RegionScanner getNewRegionScanner(Scan scan) throws IOException {
+        return ((DelegateRegionScanner)delegate).getNewRegionScanner(scan);

Review Comment:
   Coercing/Casting without checking the instance is of type 
DelegateRegionScanner can lead to ClassCastException.



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java:
##########
@@ -354,10 +361,14 @@ public boolean nextRaw(List<Cell> result) throws 
IOException {
                 overrideDelegate();
                 return super.nextRaw(result);
             }
+            @Override
+            public RegionScanner getNewRegionScanner(Scan scan) throws 
IOException {
+                return new RegionScannerHolder(c, scan,
+                        ((DelegateRegionScanner) 
delegate).getNewRegionScanner(scan));

Review Comment:
   Coercing without checking the instance is of type DelegateRegionScanner can 
lead to ClassCastException.



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME;
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME;
+
+/**
+ *  TTLRegionScanner masks expired rows using the empty column cell timestamp
+ */
+public class TTLRegionScanner extends BaseRegionScanner {
+    private static final Logger LOG =
+            LoggerFactory.getLogger(TTLRegionScanner.class);
+    private final boolean isMaskingEnabled;
+    private final RegionCoprocessorEnvironment env;
+    private Scan scan;
+    private long rowCount = 0;
+    private long pageSize = Long.MAX_VALUE;
+    long ttl;
+    long ttlWindowStart;
+    byte[] emptyCQ;
+    byte[] emptyCF;
+    private boolean initialized = false;
+
+    public TTLRegionScanner(final RegionCoprocessorEnvironment env, final Scan 
scan,
+            final RegionScanner s) {
+        super(s);
+        this.env = env;
+        this.scan = scan;
+        emptyCQ = scan.getAttribute(EMPTY_COLUMN_QUALIFIER_NAME);
+        emptyCF = scan.getAttribute(EMPTY_COLUMN_FAMILY_NAME);
+        long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+        ttl = 
env.getRegion().getTableDescriptor().getColumnFamilies()[0].getTimeToLive();
+        ttlWindowStart = ttl == HConstants.FOREVER ? 1 : currentTime - ttl * 
1000;
+        ttl *= 1000;
+        // Regardless if the Phoenix Table TTL feature is disabled cluster 
wide or the client is
+        // an older client and does not supply the empty column parameters, 
the masking should not
+        // be done here.
+        isMaskingEnabled = emptyCF != null && emptyCQ != null &&
+                
env.getConfiguration().getBoolean(QueryServices.PHOENIX_TABLE_TTL_ENABLED,
+                        
QueryServicesOptions.DEFAULT_PHOENIX_TABLE_TTL_ENABLED);
+    }
+
+    private void init() throws IOException {
+        // HBase PageFilter will also count the expired rows.
+        // Instead of using PageFilter for counting, we will count returned 
row here.
+        PageFilter pageFilter = ScanUtil.removePageFilter(scan);

Review Comment:
   Not sure I understand, why we are removing the PageFilter here and not in 
the PagingRegionScanner.  Also if PageFilter is not needed then why keep track 
of the rowCount?



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME;
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME;
+
+/**
+ *  TTLRegionScanner masks expired rows using the empty column cell timestamp
+ */
+public class TTLRegionScanner extends BaseRegionScanner {
+    private static final Logger LOG =
+            LoggerFactory.getLogger(TTLRegionScanner.class);
+    private final boolean isMaskingEnabled;
+    private final RegionCoprocessorEnvironment env;
+    private Scan scan;
+    private long rowCount = 0;
+    private long pageSize = Long.MAX_VALUE;
+    long ttl;
+    long ttlWindowStart;
+    byte[] emptyCQ;
+    byte[] emptyCF;
+    private boolean initialized = false;
+
+    public TTLRegionScanner(final RegionCoprocessorEnvironment env, final Scan 
scan,
+            final RegionScanner s) {
+        super(s);
+        this.env = env;
+        this.scan = scan;
+        emptyCQ = scan.getAttribute(EMPTY_COLUMN_QUALIFIER_NAME);
+        emptyCF = scan.getAttribute(EMPTY_COLUMN_FAMILY_NAME);
+        long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+        ttl = 
env.getRegion().getTableDescriptor().getColumnFamilies()[0].getTimeToLive();
+        ttlWindowStart = ttl == HConstants.FOREVER ? 1 : currentTime - ttl * 
1000;
+        ttl *= 1000;
+        // Regardless if the Phoenix Table TTL feature is disabled cluster 
wide or the client is
+        // an older client and does not supply the empty column parameters, 
the masking should not
+        // be done here.
+        isMaskingEnabled = emptyCF != null && emptyCQ != null &&
+                
env.getConfiguration().getBoolean(QueryServices.PHOENIX_TABLE_TTL_ENABLED,
+                        
QueryServicesOptions.DEFAULT_PHOENIX_TABLE_TTL_ENABLED);
+    }
+
+    private void init() throws IOException {
+        // HBase PageFilter will also count the expired rows.
+        // Instead of using PageFilter for counting, we will count returned 
row here.
+        PageFilter pageFilter = ScanUtil.removePageFilter(scan);
+        if (pageFilter != null) {
+            pageSize = pageFilter.getPageSize();
+            delegate.close();
+            delegate = 
((DelegateRegionScanner)delegate).getNewRegionScanner(scan);
+        }
+    }
+
+    private boolean isExpired(List<Cell> result) throws IOException {
+        long maxTimestamp = 0;
+        long minTimestamp = Long.MAX_VALUE;
+        long ts;
+        boolean found = false;
+        for (Cell c : result) {
+            ts = c.getTimestamp();
+            if (!found && ScanUtil.isEmptyColumn(c, emptyCF, emptyCQ)) {
+                if (ts < ttlWindowStart) {
+                    return true;
+                }
+                found = true;
+            }
+            if (maxTimestamp < ts) {
+                maxTimestamp = ts;
+            }
+            if (minTimestamp > ts) {
+                minTimestamp = ts;
+            }
+        }
+        if (!found) {
+            LOG.warn("No empty column cell " + 
env.getRegion().getRegionInfo().getTable());
+        }
+        if (maxTimestamp - minTimestamp <= ttl) {
+            return false;
+        }
+        // We need check if the gap between two consecutive cell timestamps is 
more than ttl
+        // and if so trim the cells beyond the gap
+        Scan singleRowScan = new Scan();
+        singleRowScan.setRaw(true);
+        singleRowScan.readAllVersions();
+        singleRowScan.setTimeRange(scan.getTimeRange().getMin(), 
scan.getTimeRange().getMax());
+        byte[] rowKey = CellUtil.cloneRow(result.get(0));
+        singleRowScan.withStartRow(rowKey, true);
+        singleRowScan.withStopRow(rowKey, true);
+        RegionScanner scanner = 
((DelegateRegionScanner)delegate).getNewRegionScanner(singleRowScan);
+        List<Cell> row = new ArrayList<>();
+        scanner.next(row);
+        scanner.close();
+        if (row.isEmpty()) {
+            return true;
+        }
+        int size = row.size();
+        long tsArray[] = new long[size];
+        int i = 0;
+        for (Cell cell : row) {
+            tsArray[i++] = cell.getTimestamp();
+        }
+        Arrays.sort(tsArray);
+        for (i = size - 1; i > 0; i--) {
+            if (tsArray[i] - tsArray[i - 1] > ttl) {
+                minTimestamp = tsArray[i];
+                break;
+            }
+        }
+        Iterator<Cell> iterator = result.iterator();
+        while(iterator.hasNext()) {
+            if (iterator.next().getTimestamp() < minTimestamp) {
+                iterator.remove();
+            }
+        }
+        return false;
+    }
+
+    private boolean skipExpired(List<Cell> result, boolean raw, boolean 
hasMore) throws IOException {
+        boolean expired = isExpired(result);
+        if (!expired) {
+            return hasMore;
+        }
+        result.clear();
+        if (!hasMore) {
+            return false;
+        }
+        long startTime = EnvironmentEdgeManager.currentTimeMillis();
+        do {
+            hasMore = raw ? delegate.nextRaw(result) : delegate.next(result);
+            if (result.isEmpty() || ScanUtil.isDummy(result)) {
+                return hasMore;
+            }
+            if (!isExpired(result)) {
+                return hasMore;
+            }
+            Cell cell = result.get(0);
+            result.clear();
+            if (EnvironmentEdgeManager.currentTimeMillis() - startTime > 
pageSize) {

Review Comment:
   Related to the previous comment on PageFilter. Here we are comparing time 
elapsed with the number of rows (pageSize) ?





> Fixing TTL and Max Lookback Issues for Phoenix Tables
> -----------------------------------------------------
>
>                 Key: PHOENIX-6888
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-6888
>             Project: Phoenix
>          Issue Type: Bug
>    Affects Versions: 5.1.3
>            Reporter: Kadir Ozdemir
>            Assignee: Kadir Ozdemir
>            Priority: Major
>
> In HBase, the unit of data is a cell and data retention rules are executed at 
> the cell level. These rules are defined at the column family level. Phoenix 
> leverages the data retention features of HBase and exposes them to its users 
> to provide its TTL feature at the table level. However, these rules (since 
> they are defined at the cell level instead of the row level) results in 
> partial row retention that in turn creates data integrity issues at the 
> Phoenix level. 
> Similarly, Phoenix’s max lookback feature leverages HBase deleted data 
> retention capabilities to preserve deleted cells within a configurable max 
> lookback. This requires two data retention windows, max lookback and TTL. One 
> end of these windows is the current time and the end is a moment in the past 
> (i.e., current time minus the window size). Typically, the max lookback 
> window is shorter than the TTL window. In the max lookback window, we would 
> like to preserve the complete history of mutations regardless of how many 
> cell versions these mutations generated. In the remaining TTL window outside 
> the max lookback, we would like to apply the data retention rules defined 
> above. However, HBase provides only one data retention window. Thus, the max 
> lookback window had to be extended to become TTL window and the max lookback 
> feature results in unwantedly retaining deleted data for the maximum of max 
> lookback and TTL periods. 
> This Jira is to fix both of these issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to