[
https://issues.apache.org/jira/browse/PHOENIX-6888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708618#comment-17708618
]
ASF GitHub Bot commented on PHOENIX-6888:
-----------------------------------------
jpisaac commented on code in PR #1569:
URL: https://github.com/apache/phoenix/pull/1569#discussion_r1157848028
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME;
+import static
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME;
+
+/**
+ * TTLRegionScanner masks expired rows using the empty column cell timestamp
+ */
+public class TTLRegionScanner extends BaseRegionScanner {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TTLRegionScanner.class);
+ private final boolean isMaskingEnabled;
+ private final RegionCoprocessorEnvironment env;
+ private Scan scan;
+ private long rowCount = 0;
+ private long pageSize = Long.MAX_VALUE;
+ long ttl;
+ long ttlWindowStart;
+ byte[] emptyCQ;
+ byte[] emptyCF;
+ private boolean initialized = false;
+
+ public TTLRegionScanner(final RegionCoprocessorEnvironment env, final Scan
scan,
+ final RegionScanner s) {
+ super(s);
+ this.env = env;
+ this.scan = scan;
+ emptyCQ = scan.getAttribute(EMPTY_COLUMN_QUALIFIER_NAME);
+ emptyCF = scan.getAttribute(EMPTY_COLUMN_FAMILY_NAME);
+ long currentTime = EnvironmentEdgeManager.currentTimeMillis();
Review Comment:
Shouldn't the current time be SCN time if one is set? (i.e if it is a SCN
query)
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java:
##########
@@ -87,4 +88,8 @@ public int getBatch() {
public RegionInfo getRegionInfo() {
return delegate.getRegionInfo();
}
+
+ public RegionScanner getNewRegionScanner(Scan scan) throws IOException {
+ return ((DelegateRegionScanner)delegate).getNewRegionScanner(scan);
Review Comment:
Coercing/Casting without checking the instance is of type
DelegateRegionScanner can lead to ClassCastException.
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java:
##########
@@ -354,10 +361,14 @@ public boolean nextRaw(List<Cell> result) throws
IOException {
overrideDelegate();
return super.nextRaw(result);
}
+ @Override
+ public RegionScanner getNewRegionScanner(Scan scan) throws
IOException {
+ return new RegionScannerHolder(c, scan,
+ ((DelegateRegionScanner)
delegate).getNewRegionScanner(scan));
Review Comment:
Coercing without checking the instance is of type DelegateRegionScanner can
lead to ClassCastException.
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME;
+import static
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME;
+
+/**
+ * TTLRegionScanner masks expired rows using the empty column cell timestamp
+ */
+public class TTLRegionScanner extends BaseRegionScanner {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TTLRegionScanner.class);
+ private final boolean isMaskingEnabled;
+ private final RegionCoprocessorEnvironment env;
+ private Scan scan;
+ private long rowCount = 0;
+ private long pageSize = Long.MAX_VALUE;
+ long ttl;
+ long ttlWindowStart;
+ byte[] emptyCQ;
+ byte[] emptyCF;
+ private boolean initialized = false;
+
+ public TTLRegionScanner(final RegionCoprocessorEnvironment env, final Scan
scan,
+ final RegionScanner s) {
+ super(s);
+ this.env = env;
+ this.scan = scan;
+ emptyCQ = scan.getAttribute(EMPTY_COLUMN_QUALIFIER_NAME);
+ emptyCF = scan.getAttribute(EMPTY_COLUMN_FAMILY_NAME);
+ long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+ ttl =
env.getRegion().getTableDescriptor().getColumnFamilies()[0].getTimeToLive();
+ ttlWindowStart = ttl == HConstants.FOREVER ? 1 : currentTime - ttl *
1000;
+ ttl *= 1000;
+ // Regardless if the Phoenix Table TTL feature is disabled cluster
wide or the client is
+ // an older client and does not supply the empty column parameters,
the masking should not
+ // be done here.
+ isMaskingEnabled = emptyCF != null && emptyCQ != null &&
+
env.getConfiguration().getBoolean(QueryServices.PHOENIX_TABLE_TTL_ENABLED,
+
QueryServicesOptions.DEFAULT_PHOENIX_TABLE_TTL_ENABLED);
+ }
+
+ private void init() throws IOException {
+ // HBase PageFilter will also count the expired rows.
+ // Instead of using PageFilter for counting, we will count returned
row here.
+ PageFilter pageFilter = ScanUtil.removePageFilter(scan);
Review Comment:
Not sure I understand, why we are removing the PageFilter here and not in
the PagingRegionScanner. Also if PageFilter is not needed then why keep track
of the rowCount?
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME;
+import static
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME;
+
+/**
+ * TTLRegionScanner masks expired rows using the empty column cell timestamp
+ */
+public class TTLRegionScanner extends BaseRegionScanner {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TTLRegionScanner.class);
+ private final boolean isMaskingEnabled;
+ private final RegionCoprocessorEnvironment env;
+ private Scan scan;
+ private long rowCount = 0;
+ private long pageSize = Long.MAX_VALUE;
+ long ttl;
+ long ttlWindowStart;
+ byte[] emptyCQ;
+ byte[] emptyCF;
+ private boolean initialized = false;
+
+ public TTLRegionScanner(final RegionCoprocessorEnvironment env, final Scan
scan,
+ final RegionScanner s) {
+ super(s);
+ this.env = env;
+ this.scan = scan;
+ emptyCQ = scan.getAttribute(EMPTY_COLUMN_QUALIFIER_NAME);
+ emptyCF = scan.getAttribute(EMPTY_COLUMN_FAMILY_NAME);
+ long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+ ttl =
env.getRegion().getTableDescriptor().getColumnFamilies()[0].getTimeToLive();
+ ttlWindowStart = ttl == HConstants.FOREVER ? 1 : currentTime - ttl *
1000;
+ ttl *= 1000;
+ // Regardless if the Phoenix Table TTL feature is disabled cluster
wide or the client is
+ // an older client and does not supply the empty column parameters,
the masking should not
+ // be done here.
+ isMaskingEnabled = emptyCF != null && emptyCQ != null &&
+
env.getConfiguration().getBoolean(QueryServices.PHOENIX_TABLE_TTL_ENABLED,
+
QueryServicesOptions.DEFAULT_PHOENIX_TABLE_TTL_ENABLED);
+ }
+
+ private void init() throws IOException {
+ // HBase PageFilter will also count the expired rows.
+ // Instead of using PageFilter for counting, we will count returned
row here.
+ PageFilter pageFilter = ScanUtil.removePageFilter(scan);
+ if (pageFilter != null) {
+ pageSize = pageFilter.getPageSize();
+ delegate.close();
+ delegate =
((DelegateRegionScanner)delegate).getNewRegionScanner(scan);
+ }
+ }
+
+ private boolean isExpired(List<Cell> result) throws IOException {
+ long maxTimestamp = 0;
+ long minTimestamp = Long.MAX_VALUE;
+ long ts;
+ boolean found = false;
+ for (Cell c : result) {
+ ts = c.getTimestamp();
+ if (!found && ScanUtil.isEmptyColumn(c, emptyCF, emptyCQ)) {
+ if (ts < ttlWindowStart) {
+ return true;
+ }
+ found = true;
+ }
+ if (maxTimestamp < ts) {
+ maxTimestamp = ts;
+ }
+ if (minTimestamp > ts) {
+ minTimestamp = ts;
+ }
+ }
+ if (!found) {
+ LOG.warn("No empty column cell " +
env.getRegion().getRegionInfo().getTable());
+ }
+ if (maxTimestamp - minTimestamp <= ttl) {
+ return false;
+ }
+ // We need check if the gap between two consecutive cell timestamps is
more than ttl
+ // and if so trim the cells beyond the gap
+ Scan singleRowScan = new Scan();
+ singleRowScan.setRaw(true);
+ singleRowScan.readAllVersions();
+ singleRowScan.setTimeRange(scan.getTimeRange().getMin(),
scan.getTimeRange().getMax());
+ byte[] rowKey = CellUtil.cloneRow(result.get(0));
+ singleRowScan.withStartRow(rowKey, true);
+ singleRowScan.withStopRow(rowKey, true);
+ RegionScanner scanner =
((DelegateRegionScanner)delegate).getNewRegionScanner(singleRowScan);
+ List<Cell> row = new ArrayList<>();
+ scanner.next(row);
+ scanner.close();
+ if (row.isEmpty()) {
+ return true;
+ }
+ int size = row.size();
+ long tsArray[] = new long[size];
+ int i = 0;
+ for (Cell cell : row) {
+ tsArray[i++] = cell.getTimestamp();
+ }
+ Arrays.sort(tsArray);
+ for (i = size - 1; i > 0; i--) {
+ if (tsArray[i] - tsArray[i - 1] > ttl) {
+ minTimestamp = tsArray[i];
+ break;
+ }
+ }
+ Iterator<Cell> iterator = result.iterator();
+ while(iterator.hasNext()) {
+ if (iterator.next().getTimestamp() < minTimestamp) {
+ iterator.remove();
+ }
+ }
+ return false;
+ }
+
+ private boolean skipExpired(List<Cell> result, boolean raw, boolean
hasMore) throws IOException {
+ boolean expired = isExpired(result);
+ if (!expired) {
+ return hasMore;
+ }
+ result.clear();
+ if (!hasMore) {
+ return false;
+ }
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ do {
+ hasMore = raw ? delegate.nextRaw(result) : delegate.next(result);
+ if (result.isEmpty() || ScanUtil.isDummy(result)) {
+ return hasMore;
+ }
+ if (!isExpired(result)) {
+ return hasMore;
+ }
+ Cell cell = result.get(0);
+ result.clear();
+ if (EnvironmentEdgeManager.currentTimeMillis() - startTime >
pageSize) {
Review Comment:
Related to the previous comment on PageFilter. Here we are comparing time
elapsed with the number of rows (pageSize) ?
> Fixing TTL and Max Lookback Issues for Phoenix Tables
> -----------------------------------------------------
>
> Key: PHOENIX-6888
> URL: https://issues.apache.org/jira/browse/PHOENIX-6888
> Project: Phoenix
> Issue Type: Bug
> Affects Versions: 5.1.3
> Reporter: Kadir Ozdemir
> Assignee: Kadir Ozdemir
> Priority: Major
>
> In HBase, the unit of data is a cell and data retention rules are executed at
> the cell level. These rules are defined at the column family level. Phoenix
> leverages the data retention features of HBase and exposes them to its users
> to provide its TTL feature at the table level. However, these rules (since
> they are defined at the cell level instead of the row level) results in
> partial row retention that in turn creates data integrity issues at the
> Phoenix level.
> Similarly, Phoenix’s max lookback feature leverages HBase deleted data
> retention capabilities to preserve deleted cells within a configurable max
> lookback. This requires two data retention windows, max lookback and TTL. One
> end of these windows is the current time and the end is a moment in the past
> (i.e., current time minus the window size). Typically, the max lookback
> window is shorter than the TTL window. In the max lookback window, we would
> like to preserve the complete history of mutations regardless of how many
> cell versions these mutations generated. In the remaining TTL window outside
> the max lookback, we would like to apply the data retention rules defined
> above. However, HBase provides only one data retention window. Thus, the max
> lookback window had to be extended to become TTL window and the max lookback
> feature results in unwantedly retaining deleted data for the maximum of max
> lookback and TTL periods.
> This Jira is to fix both of these issues.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)