jpisaac commented on code in PR #1569:
URL: https://github.com/apache/phoenix/pull/1569#discussion_r1125123215


##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/StoreCompactionScanner.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeepDeletedCells;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The store scanner that implements Phoenix TTL and Max Lookback
+ */
+public class StoreCompactionScanner implements InternalScanner {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(StoreCompactionScanner.class);
+    private final InternalScanner storeScanner;
+    private final Region region;
+    private final Store store;
+    private final Configuration config;
+    private final RegionCoprocessorEnvironment env;
+    private long maxLookbackWindowStart;
+    private long ttlWindowStart;
+    private int minVersion;
+    private int maxVersion;
+    private final boolean firstStore;
+    private KeepDeletedCells keepDeletedCells;
+    private long compactionTime;
+
+    public StoreCompactionScanner(RegionCoprocessorEnvironment env,
+                                Store store,
+                                InternalScanner storeScanner,
+                                long maxLookbackInMs) {
+        this.storeScanner = storeScanner;
+        this.region = env.getRegion();
+        this.store = store;
+        this.env = env;
+        this.config = env.getConfiguration();
+        compactionTime = EnvironmentEdgeManager.currentTimeMillis();
+        this.maxLookbackWindowStart = compactionTime - maxLookbackInMs;
+        ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
+        long ttl = cfd.getTimeToLive();
+        this.ttlWindowStart = ttl == HConstants.FOREVER ? 1 : compactionTime - 
ttl * 1000;
+        this.minVersion = cfd.getMinVersions();
+        this.maxVersion = cfd.getMaxVersions();
+        this.keepDeletedCells = cfd.getKeepDeletedCells();
+        firstStore = region.getStores().get(0).getColumnFamilyName().
+                equals(store.getColumnFamilyName());
+    }
+
+    @Override
+    public boolean next(List<Cell> result) throws IOException {
+        synchronized (storeScanner) {
+            boolean hasMore = storeScanner.next(result);
+            filter(result, true);
+            Collections.sort(result, CellComparator.getInstance());
+            return hasMore;
+        }
+    }
+
+    @Override
+    public boolean next(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
+        return next(result);
+    }
+
+    @Override
+    public void close() throws IOException {
+        storeScanner.close();
+    }
+    private void formColumns(List<Cell> result, List<List<Cell>> columns,
+            List<Cell> deleteMarkers) {
+        Cell currentColumnCell = null;
+        List<Cell> currentColumn = null;
+        for (Cell cell : result) {
+            if (cell.getType() != Cell.Type.Put) {
+                deleteMarkers.add(cell);
+            }
+            if (currentColumnCell == null) {
+                currentColumn = new ArrayList<>();
+                currentColumnCell = cell;
+                currentColumn.add(cell);
+            }
+            else if (Bytes.compareTo(cell.getQualifierArray(), 
cell.getQualifierOffset(),
+                    cell.getQualifierLength(),
+                    currentColumnCell.getQualifierArray(), 
currentColumnCell.getQualifierOffset(),
+                    currentColumnCell.getQualifierLength()) != 0) {
+                columns.add(currentColumn);
+                currentColumn = new ArrayList<>();
+                currentColumnCell = cell;
+                currentColumn.add(cell);
+            }
+            else {
+                currentColumn.add(cell);
+            }
+        }
+        if (currentColumn != null) {
+            columns.add(currentColumn);
+        }
+    }
+
+    /**
+     * A row version that does not share a cell with any other row version is 
called a
+     * compaction row version.
+     * The latest live or deleted row version at the compaction time 
(compactionTime) is the first
+     * compaction row version. The next row version which does not share a 
cell with the
+     * first compaction row version is the next compaction row version.
+     *
+     * The first compaction row version is a valid row version (i.e., a row 
version at a given
+     * time). The subsequent compactions row versions may not represent a 
valid row version if
+     * the rows are updated partially.
+     *
+     * Compaction row versions are used for compaction purposes to determine 
which row versions to
+     * retain.
+     */
+    class CompactionRowVersion {
+        // Cells included in the row version
+        List<Cell> cells = new ArrayList<>();
+        // The delete marker deleting this row version
+        Cell deleteFamilyMarker = null;
+        // Delete or DeleteColumn markers deleting a cell of this version
+        List<Cell> columnDeleteMarkers = null;
+        // The timestamp of the row version
+        long ts = 0;
+        // The version of a row version. It is the minimum of the versions of 
the cells included
+        // in the row version
+        int version = 0;
+        private void addColumnDeleteMarker(Cell deleteMarker) {
+            if (columnDeleteMarkers == null) {
+                columnDeleteMarkers = new ArrayList<>();
+            }
+            columnDeleteMarkers.add(deleteMarker);
+        }
+    }
+
+    private boolean isCellDeleted(List<Cell> deleteMarkers, 
CompactionRowVersion rowVersion,
+            Cell cell, boolean storeLevel) {
+        int i = 0;
+        for (Cell dm : deleteMarkers) {
+            if ((storeLevel ||
+                    Bytes.compareTo(cell.getFamilyArray(), 
cell.getFamilyOffset(),
+                    cell.getFamilyLength(),
+                    dm.getFamilyArray(), dm.getFamilyOffset(), 
dm.getFamilyLength()) == 0) &&
+                    Bytes.compareTo(cell.getQualifierArray(), 
cell.getQualifierOffset(),
+                            cell.getQualifierLength(),
+                            dm.getQualifierArray(), dm.getQualifierOffset(),
+                            dm.getQualifierLength()) == 0) {
+                if (dm.getType() == Cell.Type.Delete) {
+                    deleteMarkers.remove(i);
+                    if (rowVersion.columnDeleteMarkers == null) {
+                        rowVersion.columnDeleteMarkers = new ArrayList<>();
+                    }
+                    rowVersion.columnDeleteMarkers.add(dm);
+                }
+                return true;
+            }
+            i++;
+        }
+        return false;
+    }
+
+    private long getNextRowVersionTimestamp(List<List<Cell>> columns) {
+        long ts = 0;
+        for (List<Cell> column : columns) {
+            Cell firstCell = column.get(0);
+            if (firstCell.getType() == Cell.Type.DeleteFamily ||
+                    firstCell.getType() == Cell.Type.DeleteFamilyVersion) {
+                break;
+            }
+            if (firstCell.getType() == Cell.Type.DeleteColumn ||
+                    firstCell.getType() == Cell.Type.Delete) {
+                continue;
+            }
+            if (ts < firstCell.getTimestamp()) {
+                ts = firstCell.getTimestamp();
+            }
+        }
+        return ts;
+    }
+
+    private boolean formRowVersions(List<List<Cell>> columns,
+                                    List<CompactionRowVersion> rowVersions,
+                                    boolean storeLevel) {
+        Cell lastDeleteFamilyMarker = null;
+        Cell lastDeleteFamilyVersionMarker = null;
+        List<Cell> columnDeleteMarkers = null;
+        int version = 0;
+        while (!columns.isEmpty()) {
+            long ts = getNextRowVersionTimestamp(columns);
+            CompactionRowVersion rowVersion = null;
+            // Form the next row version by picking the first cell from each 
column if the cell
+            // is not masked by a delete marker
+            for (List<Cell> column : columns) {
+                Cell firstCell = column.remove(0);
+                if (firstCell.getType() == Cell.Type.DeleteFamily ||
+                        firstCell.getType() == Cell.Type.DeleteFamilyVersion) {
+                    if (firstCell.getType() == Cell.Type.DeleteFamily) {
+                        if (firstCell.getTimestamp() >= ttlWindowStart &&

Review Comment:
   Don't we want to preserve the delete marker if it is inside the maxLookback 
window even if KEEP_DELETED_CELLS == FALSE?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to