[ 
https://issues.apache.org/jira/browse/PHOENIX-7108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817787#comment-17817787
 ] 

ASF GitHub Bot commented on PHOENIX-7108:
-----------------------------------------

kadirozde commented on code in PR #1799:
URL: https://github.com/apache/phoenix/pull/1799#discussion_r1491637691


##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java:
##########
@@ -84,45 +110,99 @@ public CompactionScanner(RegionCoprocessorEnvironment env,
             Store store,
             InternalScanner storeScanner,
             long maxLookbackInMillis,
-            byte[] emptyCF,
-            byte[] emptyCQ,
-            int phoenixTTL,
-            boolean isSystemTable) {
+            PTable table) throws IOException {
         this.storeScanner = storeScanner;
         this.region = env.getRegion();
         this.store = store;
         this.env = env;
-        this.emptyCF = emptyCF;
-        this.emptyCQ = emptyCQ;
+        this.emptyCF = SchemaUtil.getEmptyColumnFamily(table);
+        this.emptyCQ = table.getEncodingScheme() == 
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ?
+                QueryConstants.EMPTY_COLUMN_BYTES : 
table.getEncodingScheme().encode(QueryConstants.ENCODED_EMPTY_COLUMN_NAME);
         this.config = env.getConfiguration();
         compactionTime = EnvironmentEdgeManager.currentTimeMillis();
-        this.maxLookbackInMillis = maxLookbackInMillis;
         String columnFamilyName = store.getColumnFamilyName();
         storeColumnFamily = columnFamilyName.getBytes();
         String tableName = region.getRegionInfo().getTable().getNameAsString();
         Long overriddenMaxLookback =
                 maxLookbackMap.remove(tableName + SEPARATOR + 
columnFamilyName);
         maxLookbackInMillis = overriddenMaxLookback == null ?
                 maxLookbackInMillis : Math.max(maxLookbackInMillis, 
overriddenMaxLookback);
+
+        this.maxLookbackInMillis = maxLookbackInMillis;
         // The oldest scn is current time - maxLookbackInMillis. Phoenix sets 
the scan time range
         // for scn queries [0, scn). This means that the maxlookback size 
should be
         // maxLookbackInMillis + 1 so that the oldest scn does not return 
empty row
-        this.maxLookbackWindowStart = maxLookbackInMillis == 0 ?
-                compactionTime : compactionTime - (maxLookbackInMillis + 1);
+        this.maxLookbackWindowStart = maxLookbackInMillis == 0 ? 
compactionTime : compactionTime - (maxLookbackInMillis + 1);
         ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
-        this.ttl = isSystemTable ? cfd.getTimeToLive() : phoenixTTL;
-        this.ttlWindowStart = ttl == HConstants.FOREVER ? 1 : compactionTime - 
ttl * 1000;
-        ttl *= 1000;
-        this.maxLookbackWindowStart = Math.max(ttlWindowStart, 
maxLookbackWindowStart);
         this.minVersion = cfd.getMinVersions();
         this.maxVersion = cfd.getMaxVersions();
         this.keepDeletedCells = cfd.getKeepDeletedCells();
         familyCount = region.getTableDescriptor().getColumnFamilies().length;
         localIndex = 
columnFamilyName.startsWith(LOCAL_INDEX_COLUMN_FAMILY_PREFIX);
-        emptyCFStore = familyCount == 1 || 
columnFamilyName.equals(Bytes.toString(emptyCF))
-                        || localIndex;
-        phoenixLevelRowCompactor = new PhoenixLevelRowCompactor();
-        hBaseLevelRowCompactor = new HBaseLevelRowCompactor();
+        emptyCFStore = familyCount == 1 || 
columnFamilyName.equals(Bytes.toString(emptyCF)) || localIndex;
+        // TODO: check if is it appropriate to throw an IOException here

Review Comment:
   Is this TODO still relevant?



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java:
##########
@@ -160,17 +240,390 @@ public void close() throws IOException {
         storeScanner.close();
     }
 
+    private enum MatcherType {
+        VIEW_INDEXES, GLOBAL_VIEWS, TENANT_VIEWS
+    }
+
+    private interface TTLTracker {
+        void setTTL(Cell firstCell);
+        RowContext getRowContext();
+    }
+    private class NonPartitionedTableTTLTracker implements TTLTracker {
+
+        private long ttl;
+        private RowContext rowContext;
+
+        public NonPartitionedTableTTLTracker(PTable pTable, 
RegionCoprocessorEnvironment env, Store store) {
+            boolean isSystemTable = pTable.getType() == PTableType.SYSTEM;
+            if (isSystemTable) {
+                ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();

Review Comment:
   Can we have a config param, say, phoenix.system.catalog.ttl, to specify the 
ttl for the system table? We want to eliminate HBase TTL for all tables.



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java:
##########
@@ -84,45 +110,99 @@ public CompactionScanner(RegionCoprocessorEnvironment env,
             Store store,
             InternalScanner storeScanner,
             long maxLookbackInMillis,
-            byte[] emptyCF,
-            byte[] emptyCQ,
-            int phoenixTTL,
-            boolean isSystemTable) {
+            PTable table) throws IOException {
         this.storeScanner = storeScanner;
         this.region = env.getRegion();
         this.store = store;
         this.env = env;
-        this.emptyCF = emptyCF;
-        this.emptyCQ = emptyCQ;
+        this.emptyCF = SchemaUtil.getEmptyColumnFamily(table);
+        this.emptyCQ = table.getEncodingScheme() == 
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ?
+                QueryConstants.EMPTY_COLUMN_BYTES : 
table.getEncodingScheme().encode(QueryConstants.ENCODED_EMPTY_COLUMN_NAME);
         this.config = env.getConfiguration();
         compactionTime = EnvironmentEdgeManager.currentTimeMillis();
-        this.maxLookbackInMillis = maxLookbackInMillis;
         String columnFamilyName = store.getColumnFamilyName();
         storeColumnFamily = columnFamilyName.getBytes();
         String tableName = region.getRegionInfo().getTable().getNameAsString();
         Long overriddenMaxLookback =
                 maxLookbackMap.remove(tableName + SEPARATOR + 
columnFamilyName);
         maxLookbackInMillis = overriddenMaxLookback == null ?
                 maxLookbackInMillis : Math.max(maxLookbackInMillis, 
overriddenMaxLookback);
+
+        this.maxLookbackInMillis = maxLookbackInMillis;
         // The oldest scn is current time - maxLookbackInMillis. Phoenix sets 
the scan time range
         // for scn queries [0, scn). This means that the maxlookback size 
should be
         // maxLookbackInMillis + 1 so that the oldest scn does not return 
empty row
-        this.maxLookbackWindowStart = maxLookbackInMillis == 0 ?
-                compactionTime : compactionTime - (maxLookbackInMillis + 1);
+        this.maxLookbackWindowStart = maxLookbackInMillis == 0 ? 
compactionTime : compactionTime - (maxLookbackInMillis + 1);
         ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
-        this.ttl = isSystemTable ? cfd.getTimeToLive() : phoenixTTL;
-        this.ttlWindowStart = ttl == HConstants.FOREVER ? 1 : compactionTime - 
ttl * 1000;
-        ttl *= 1000;
-        this.maxLookbackWindowStart = Math.max(ttlWindowStart, 
maxLookbackWindowStart);
         this.minVersion = cfd.getMinVersions();
         this.maxVersion = cfd.getMaxVersions();
         this.keepDeletedCells = cfd.getKeepDeletedCells();
         familyCount = region.getTableDescriptor().getColumnFamilies().length;
         localIndex = 
columnFamilyName.startsWith(LOCAL_INDEX_COLUMN_FAMILY_PREFIX);
-        emptyCFStore = familyCount == 1 || 
columnFamilyName.equals(Bytes.toString(emptyCF))
-                        || localIndex;
-        phoenixLevelRowCompactor = new PhoenixLevelRowCompactor();
-        hBaseLevelRowCompactor = new HBaseLevelRowCompactor();
+        emptyCFStore = familyCount == 1 || 
columnFamilyName.equals(Bytes.toString(emptyCF)) || localIndex;
+        // TODO: check if is it appropriate to throw an IOException here
+        TTLTracker ttlTracker = createTTLTrackerFor(env, store, table);
+        phoenixLevelRowCompactor = new PhoenixLevelRowCompactor(ttlTracker);
+        hBaseLevelRowCompactor = new HBaseLevelRowCompactor(ttlTracker);
+        LOGGER.info(String.format("CompactionScanner params:- (" +
+                        "physical-data-tablename = %s, compaction-tablename = 
%s, " +
+                        "emptyCF = %s, emptyCQ = %s, " +
+                        "minVersion = %d, maxVersion = %d, keepDeletedCells = 
%s, " +
+                        "familyCount = %d, localIndex = %s, emptyCFStore = %s, 
" +
+                        "compactionTime = %d, maxLookbackWindowStart = %d, 
maxLookbackInMillis = %d)",
+                table.getName().toString(), tableName,
+                Bytes.toString(this.emptyCF), Bytes.toString(emptyCQ),
+                this.minVersion, this.maxVersion, this.keepDeletedCells.name(),
+                this.familyCount, this.localIndex, this.emptyCFStore,
+                compactionTime, maxLookbackWindowStart, maxLookbackInMillis));
+
+    }
+
+    /**
+     * Helper method to create TTL tracker for various phoenix data model 
objects
+     * i.e views, view indexes ...
+     * @param env
+     * @param store
+     * @param baseTable
+     * @return
+     */
+    private TTLTracker createTTLTrackerFor(RegionCoprocessorEnvironment env,
+            Store store,  PTable baseTable) throws IOException {
+
+        long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+        String compactionTableName = 
env.getRegion().getRegionInfo().getTable().getNameAsString();
+        String schemaName = 
SchemaUtil.getSchemaNameFromFullName(baseTable.getName().toString());
+        String tableName = 
SchemaUtil.getTableNameFromFullName(baseTable.getName().toString());
+
+        boolean isPartitionedIndexTable = false;
+        if 
(compactionTableName.startsWith(MetaDataUtil.VIEW_INDEX_TABLE_PREFIX)) {
+            isPartitionedIndexTable = true;
+        }
+
+        // NonPartitioned: Salt bucket property can be separately set for base 
tables and indexes.
+        // Partitioned: Salt bucket property can be set only for the base 
table.
+        // Global views, Tenant views, view indexes inherit the salt bucket 
property from their
+        // base table.
+        boolean isSalted = baseTable.getBucketNum() != null;
+        try (PhoenixConnection serverConnection = 
QueryUtil.getConnectionOnServer(new Properties(),
+                env.getConfiguration()).unwrap(PhoenixConnection.class)) {
+
+            Table childLinkHTable = 
serverConnection.getQueryServices().getTable(SYSTEM_CHILD_LINK_NAME_BYTES);
+            // If there is atleast one child view for this table then it is a 
partitioned table.
+            boolean isPartitioned = ViewUtil.hasChildViews(
+                    childLinkHTable,
+                    EMPTY_BYTE_ARRAY,
+                    Bytes.toBytes(schemaName),
+                    Bytes.toBytes(tableName),
+                    currentTime);
+
+            return isPartitioned ?
+                    new PartitionedTableTTLTracker(baseTable, env, isSalted, 
isPartitionedIndexTable) :

Review Comment:
   View indexes are partitioned table but they do not have any children in the 
child link table. This code does not handle them properly, does it?



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java:
##########
@@ -84,45 +110,99 @@ public CompactionScanner(RegionCoprocessorEnvironment env,
             Store store,
             InternalScanner storeScanner,
             long maxLookbackInMillis,
-            byte[] emptyCF,
-            byte[] emptyCQ,
-            int phoenixTTL,
-            boolean isSystemTable) {
+            PTable table) throws IOException {
         this.storeScanner = storeScanner;
         this.region = env.getRegion();
         this.store = store;
         this.env = env;
-        this.emptyCF = emptyCF;
-        this.emptyCQ = emptyCQ;
+        this.emptyCF = SchemaUtil.getEmptyColumnFamily(table);
+        this.emptyCQ = table.getEncodingScheme() == 
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ?
+                QueryConstants.EMPTY_COLUMN_BYTES : 
table.getEncodingScheme().encode(QueryConstants.ENCODED_EMPTY_COLUMN_NAME);
         this.config = env.getConfiguration();
         compactionTime = EnvironmentEdgeManager.currentTimeMillis();
-        this.maxLookbackInMillis = maxLookbackInMillis;
         String columnFamilyName = store.getColumnFamilyName();
         storeColumnFamily = columnFamilyName.getBytes();
         String tableName = region.getRegionInfo().getTable().getNameAsString();
         Long overriddenMaxLookback =
                 maxLookbackMap.remove(tableName + SEPARATOR + 
columnFamilyName);
         maxLookbackInMillis = overriddenMaxLookback == null ?
                 maxLookbackInMillis : Math.max(maxLookbackInMillis, 
overriddenMaxLookback);
+
+        this.maxLookbackInMillis = maxLookbackInMillis;
         // The oldest scn is current time - maxLookbackInMillis. Phoenix sets 
the scan time range
         // for scn queries [0, scn). This means that the maxlookback size 
should be
         // maxLookbackInMillis + 1 so that the oldest scn does not return 
empty row
-        this.maxLookbackWindowStart = maxLookbackInMillis == 0 ?
-                compactionTime : compactionTime - (maxLookbackInMillis + 1);
+        this.maxLookbackWindowStart = maxLookbackInMillis == 0 ? 
compactionTime : compactionTime - (maxLookbackInMillis + 1);
         ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
-        this.ttl = isSystemTable ? cfd.getTimeToLive() : phoenixTTL;
-        this.ttlWindowStart = ttl == HConstants.FOREVER ? 1 : compactionTime - 
ttl * 1000;
-        ttl *= 1000;
-        this.maxLookbackWindowStart = Math.max(ttlWindowStart, 
maxLookbackWindowStart);
         this.minVersion = cfd.getMinVersions();
         this.maxVersion = cfd.getMaxVersions();
         this.keepDeletedCells = cfd.getKeepDeletedCells();
         familyCount = region.getTableDescriptor().getColumnFamilies().length;
         localIndex = 
columnFamilyName.startsWith(LOCAL_INDEX_COLUMN_FAMILY_PREFIX);
-        emptyCFStore = familyCount == 1 || 
columnFamilyName.equals(Bytes.toString(emptyCF))
-                        || localIndex;
-        phoenixLevelRowCompactor = new PhoenixLevelRowCompactor();
-        hBaseLevelRowCompactor = new HBaseLevelRowCompactor();
+        emptyCFStore = familyCount == 1 || 
columnFamilyName.equals(Bytes.toString(emptyCF)) || localIndex;
+        // TODO: check if is it appropriate to throw an IOException here
+        TTLTracker ttlTracker = createTTLTrackerFor(env, store, table);
+        phoenixLevelRowCompactor = new PhoenixLevelRowCompactor(ttlTracker);
+        hBaseLevelRowCompactor = new HBaseLevelRowCompactor(ttlTracker);
+        LOGGER.info(String.format("CompactionScanner params:- (" +
+                        "physical-data-tablename = %s, compaction-tablename = 
%s, " +
+                        "emptyCF = %s, emptyCQ = %s, " +
+                        "minVersion = %d, maxVersion = %d, keepDeletedCells = 
%s, " +
+                        "familyCount = %d, localIndex = %s, emptyCFStore = %s, 
" +
+                        "compactionTime = %d, maxLookbackWindowStart = %d, 
maxLookbackInMillis = %d)",
+                table.getName().toString(), tableName,
+                Bytes.toString(this.emptyCF), Bytes.toString(emptyCQ),
+                this.minVersion, this.maxVersion, this.keepDeletedCells.name(),
+                this.familyCount, this.localIndex, this.emptyCFStore,
+                compactionTime, maxLookbackWindowStart, maxLookbackInMillis));
+
+    }
+
+    /**
+     * Helper method to create TTL tracker for various phoenix data model 
objects
+     * i.e views, view indexes ...
+     * @param env
+     * @param store
+     * @param baseTable
+     * @return
+     */
+    private TTLTracker createTTLTrackerFor(RegionCoprocessorEnvironment env,
+            Store store,  PTable baseTable) throws IOException {
+
+        long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+        String compactionTableName = 
env.getRegion().getRegionInfo().getTable().getNameAsString();

Review Comment:
   This code assumes that the physical table name is the same as the Phoenix 
table name. This may not be true if the Phoenix table went through a 
transformation due to an online data format change operation. We should check 
if the Phoenix table exists with this name. If it does not exist then,  we 
should query syscat using the physical table column to find the actual name for 
the Phoenix table.





> Provide support for pruning expired rows of views using Phoenix level 
> compactions
> ---------------------------------------------------------------------------------
>
>                 Key: PHOENIX-7108
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-7108
>             Project: Phoenix
>          Issue Type: Sub-task
>            Reporter: Jacob Isaac
>            Assignee: Jacob Isaac
>            Priority: Major
>
> Modify Phoenix compaction framework introduced in PHOENIX-6888 to prune TTL 
> expired rows of views.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to