kadirozde commented on code in PR #1799:
URL: https://github.com/apache/phoenix/pull/1799#discussion_r1491637691
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java:
##########
@@ -84,45 +110,99 @@ public CompactionScanner(RegionCoprocessorEnvironment env,
Store store,
InternalScanner storeScanner,
long maxLookbackInMillis,
- byte[] emptyCF,
- byte[] emptyCQ,
- int phoenixTTL,
- boolean isSystemTable) {
+ PTable table) throws IOException {
this.storeScanner = storeScanner;
this.region = env.getRegion();
this.store = store;
this.env = env;
- this.emptyCF = emptyCF;
- this.emptyCQ = emptyCQ;
+ this.emptyCF = SchemaUtil.getEmptyColumnFamily(table);
+ this.emptyCQ = table.getEncodingScheme() ==
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ?
+ QueryConstants.EMPTY_COLUMN_BYTES :
table.getEncodingScheme().encode(QueryConstants.ENCODED_EMPTY_COLUMN_NAME);
this.config = env.getConfiguration();
compactionTime = EnvironmentEdgeManager.currentTimeMillis();
- this.maxLookbackInMillis = maxLookbackInMillis;
String columnFamilyName = store.getColumnFamilyName();
storeColumnFamily = columnFamilyName.getBytes();
String tableName = region.getRegionInfo().getTable().getNameAsString();
Long overriddenMaxLookback =
maxLookbackMap.remove(tableName + SEPARATOR +
columnFamilyName);
maxLookbackInMillis = overriddenMaxLookback == null ?
maxLookbackInMillis : Math.max(maxLookbackInMillis,
overriddenMaxLookback);
+
+ this.maxLookbackInMillis = maxLookbackInMillis;
// The oldest scn is current time - maxLookbackInMillis. Phoenix sets
the scan time range
// for scn queries [0, scn). This means that the maxlookback size
should be
// maxLookbackInMillis + 1 so that the oldest scn does not return
empty row
- this.maxLookbackWindowStart = maxLookbackInMillis == 0 ?
- compactionTime : compactionTime - (maxLookbackInMillis + 1);
+ this.maxLookbackWindowStart = maxLookbackInMillis == 0 ?
compactionTime : compactionTime - (maxLookbackInMillis + 1);
ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
- this.ttl = isSystemTable ? cfd.getTimeToLive() : phoenixTTL;
- this.ttlWindowStart = ttl == HConstants.FOREVER ? 1 : compactionTime -
ttl * 1000;
- ttl *= 1000;
- this.maxLookbackWindowStart = Math.max(ttlWindowStart,
maxLookbackWindowStart);
this.minVersion = cfd.getMinVersions();
this.maxVersion = cfd.getMaxVersions();
this.keepDeletedCells = cfd.getKeepDeletedCells();
familyCount = region.getTableDescriptor().getColumnFamilies().length;
localIndex =
columnFamilyName.startsWith(LOCAL_INDEX_COLUMN_FAMILY_PREFIX);
- emptyCFStore = familyCount == 1 ||
columnFamilyName.equals(Bytes.toString(emptyCF))
- || localIndex;
- phoenixLevelRowCompactor = new PhoenixLevelRowCompactor();
- hBaseLevelRowCompactor = new HBaseLevelRowCompactor();
+ emptyCFStore = familyCount == 1 ||
columnFamilyName.equals(Bytes.toString(emptyCF)) || localIndex;
+ // TODO: check if is it appropriate to throw an IOException here
Review Comment:
Is this TODO still relevant?
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java:
##########
@@ -160,17 +240,390 @@ public void close() throws IOException {
storeScanner.close();
}
+ private enum MatcherType {
+ VIEW_INDEXES, GLOBAL_VIEWS, TENANT_VIEWS
+ }
+
+ private interface TTLTracker {
+ void setTTL(Cell firstCell);
+ RowContext getRowContext();
+ }
+ private class NonPartitionedTableTTLTracker implements TTLTracker {
+
+ private long ttl;
+ private RowContext rowContext;
+
+ public NonPartitionedTableTTLTracker(PTable pTable,
RegionCoprocessorEnvironment env, Store store) {
+ boolean isSystemTable = pTable.getType() == PTableType.SYSTEM;
+ if (isSystemTable) {
+ ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
Review Comment:
Can we have a config param, say, phoenix.system.catalog.ttl, to specify the
ttl for the system table? We want to eliminate HBase TTL for all tables.
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java:
##########
@@ -84,45 +110,99 @@ public CompactionScanner(RegionCoprocessorEnvironment env,
Store store,
InternalScanner storeScanner,
long maxLookbackInMillis,
- byte[] emptyCF,
- byte[] emptyCQ,
- int phoenixTTL,
- boolean isSystemTable) {
+ PTable table) throws IOException {
this.storeScanner = storeScanner;
this.region = env.getRegion();
this.store = store;
this.env = env;
- this.emptyCF = emptyCF;
- this.emptyCQ = emptyCQ;
+ this.emptyCF = SchemaUtil.getEmptyColumnFamily(table);
+ this.emptyCQ = table.getEncodingScheme() ==
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ?
+ QueryConstants.EMPTY_COLUMN_BYTES :
table.getEncodingScheme().encode(QueryConstants.ENCODED_EMPTY_COLUMN_NAME);
this.config = env.getConfiguration();
compactionTime = EnvironmentEdgeManager.currentTimeMillis();
- this.maxLookbackInMillis = maxLookbackInMillis;
String columnFamilyName = store.getColumnFamilyName();
storeColumnFamily = columnFamilyName.getBytes();
String tableName = region.getRegionInfo().getTable().getNameAsString();
Long overriddenMaxLookback =
maxLookbackMap.remove(tableName + SEPARATOR +
columnFamilyName);
maxLookbackInMillis = overriddenMaxLookback == null ?
maxLookbackInMillis : Math.max(maxLookbackInMillis,
overriddenMaxLookback);
+
+ this.maxLookbackInMillis = maxLookbackInMillis;
// The oldest scn is current time - maxLookbackInMillis. Phoenix sets
the scan time range
// for scn queries [0, scn). This means that the maxlookback size
should be
// maxLookbackInMillis + 1 so that the oldest scn does not return
empty row
- this.maxLookbackWindowStart = maxLookbackInMillis == 0 ?
- compactionTime : compactionTime - (maxLookbackInMillis + 1);
+ this.maxLookbackWindowStart = maxLookbackInMillis == 0 ?
compactionTime : compactionTime - (maxLookbackInMillis + 1);
ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
- this.ttl = isSystemTable ? cfd.getTimeToLive() : phoenixTTL;
- this.ttlWindowStart = ttl == HConstants.FOREVER ? 1 : compactionTime -
ttl * 1000;
- ttl *= 1000;
- this.maxLookbackWindowStart = Math.max(ttlWindowStart,
maxLookbackWindowStart);
this.minVersion = cfd.getMinVersions();
this.maxVersion = cfd.getMaxVersions();
this.keepDeletedCells = cfd.getKeepDeletedCells();
familyCount = region.getTableDescriptor().getColumnFamilies().length;
localIndex =
columnFamilyName.startsWith(LOCAL_INDEX_COLUMN_FAMILY_PREFIX);
- emptyCFStore = familyCount == 1 ||
columnFamilyName.equals(Bytes.toString(emptyCF))
- || localIndex;
- phoenixLevelRowCompactor = new PhoenixLevelRowCompactor();
- hBaseLevelRowCompactor = new HBaseLevelRowCompactor();
+ emptyCFStore = familyCount == 1 ||
columnFamilyName.equals(Bytes.toString(emptyCF)) || localIndex;
+ // TODO: check if is it appropriate to throw an IOException here
+ TTLTracker ttlTracker = createTTLTrackerFor(env, store, table);
+ phoenixLevelRowCompactor = new PhoenixLevelRowCompactor(ttlTracker);
+ hBaseLevelRowCompactor = new HBaseLevelRowCompactor(ttlTracker);
+ LOGGER.info(String.format("CompactionScanner params:- (" +
+ "physical-data-tablename = %s, compaction-tablename =
%s, " +
+ "emptyCF = %s, emptyCQ = %s, " +
+ "minVersion = %d, maxVersion = %d, keepDeletedCells =
%s, " +
+ "familyCount = %d, localIndex = %s, emptyCFStore = %s,
" +
+ "compactionTime = %d, maxLookbackWindowStart = %d,
maxLookbackInMillis = %d)",
+ table.getName().toString(), tableName,
+ Bytes.toString(this.emptyCF), Bytes.toString(emptyCQ),
+ this.minVersion, this.maxVersion, this.keepDeletedCells.name(),
+ this.familyCount, this.localIndex, this.emptyCFStore,
+ compactionTime, maxLookbackWindowStart, maxLookbackInMillis));
+
+ }
+
+ /**
+ * Helper method to create TTL tracker for various phoenix data model
objects
+ * i.e views, view indexes ...
+ * @param env
+ * @param store
+ * @param baseTable
+ * @return
+ */
+ private TTLTracker createTTLTrackerFor(RegionCoprocessorEnvironment env,
+ Store store, PTable baseTable) throws IOException {
+
+ long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+ String compactionTableName =
env.getRegion().getRegionInfo().getTable().getNameAsString();
+ String schemaName =
SchemaUtil.getSchemaNameFromFullName(baseTable.getName().toString());
+ String tableName =
SchemaUtil.getTableNameFromFullName(baseTable.getName().toString());
+
+ boolean isPartitionedIndexTable = false;
+ if
(compactionTableName.startsWith(MetaDataUtil.VIEW_INDEX_TABLE_PREFIX)) {
+ isPartitionedIndexTable = true;
+ }
+
+ // NonPartitioned: Salt bucket property can be separately set for base
tables and indexes.
+ // Partitioned: Salt bucket property can be set only for the base
table.
+ // Global views, Tenant views, view indexes inherit the salt bucket
property from their
+ // base table.
+ boolean isSalted = baseTable.getBucketNum() != null;
+ try (PhoenixConnection serverConnection =
QueryUtil.getConnectionOnServer(new Properties(),
+ env.getConfiguration()).unwrap(PhoenixConnection.class)) {
+
+ Table childLinkHTable =
serverConnection.getQueryServices().getTable(SYSTEM_CHILD_LINK_NAME_BYTES);
+ // If there is atleast one child view for this table then it is a
partitioned table.
+ boolean isPartitioned = ViewUtil.hasChildViews(
+ childLinkHTable,
+ EMPTY_BYTE_ARRAY,
+ Bytes.toBytes(schemaName),
+ Bytes.toBytes(tableName),
+ currentTime);
+
+ return isPartitioned ?
+ new PartitionedTableTTLTracker(baseTable, env, isSalted,
isPartitionedIndexTable) :
Review Comment:
View indexes are partitioned table but they do not have any children in the
child link table. This code does not handle them properly, does it?
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java:
##########
@@ -84,45 +110,99 @@ public CompactionScanner(RegionCoprocessorEnvironment env,
Store store,
InternalScanner storeScanner,
long maxLookbackInMillis,
- byte[] emptyCF,
- byte[] emptyCQ,
- int phoenixTTL,
- boolean isSystemTable) {
+ PTable table) throws IOException {
this.storeScanner = storeScanner;
this.region = env.getRegion();
this.store = store;
this.env = env;
- this.emptyCF = emptyCF;
- this.emptyCQ = emptyCQ;
+ this.emptyCF = SchemaUtil.getEmptyColumnFamily(table);
+ this.emptyCQ = table.getEncodingScheme() ==
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ?
+ QueryConstants.EMPTY_COLUMN_BYTES :
table.getEncodingScheme().encode(QueryConstants.ENCODED_EMPTY_COLUMN_NAME);
this.config = env.getConfiguration();
compactionTime = EnvironmentEdgeManager.currentTimeMillis();
- this.maxLookbackInMillis = maxLookbackInMillis;
String columnFamilyName = store.getColumnFamilyName();
storeColumnFamily = columnFamilyName.getBytes();
String tableName = region.getRegionInfo().getTable().getNameAsString();
Long overriddenMaxLookback =
maxLookbackMap.remove(tableName + SEPARATOR +
columnFamilyName);
maxLookbackInMillis = overriddenMaxLookback == null ?
maxLookbackInMillis : Math.max(maxLookbackInMillis,
overriddenMaxLookback);
+
+ this.maxLookbackInMillis = maxLookbackInMillis;
// The oldest scn is current time - maxLookbackInMillis. Phoenix sets
the scan time range
// for scn queries [0, scn). This means that the maxlookback size
should be
// maxLookbackInMillis + 1 so that the oldest scn does not return
empty row
- this.maxLookbackWindowStart = maxLookbackInMillis == 0 ?
- compactionTime : compactionTime - (maxLookbackInMillis + 1);
+ this.maxLookbackWindowStart = maxLookbackInMillis == 0 ?
compactionTime : compactionTime - (maxLookbackInMillis + 1);
ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
- this.ttl = isSystemTable ? cfd.getTimeToLive() : phoenixTTL;
- this.ttlWindowStart = ttl == HConstants.FOREVER ? 1 : compactionTime -
ttl * 1000;
- ttl *= 1000;
- this.maxLookbackWindowStart = Math.max(ttlWindowStart,
maxLookbackWindowStart);
this.minVersion = cfd.getMinVersions();
this.maxVersion = cfd.getMaxVersions();
this.keepDeletedCells = cfd.getKeepDeletedCells();
familyCount = region.getTableDescriptor().getColumnFamilies().length;
localIndex =
columnFamilyName.startsWith(LOCAL_INDEX_COLUMN_FAMILY_PREFIX);
- emptyCFStore = familyCount == 1 ||
columnFamilyName.equals(Bytes.toString(emptyCF))
- || localIndex;
- phoenixLevelRowCompactor = new PhoenixLevelRowCompactor();
- hBaseLevelRowCompactor = new HBaseLevelRowCompactor();
+ emptyCFStore = familyCount == 1 ||
columnFamilyName.equals(Bytes.toString(emptyCF)) || localIndex;
+ // TODO: check if is it appropriate to throw an IOException here
+ TTLTracker ttlTracker = createTTLTrackerFor(env, store, table);
+ phoenixLevelRowCompactor = new PhoenixLevelRowCompactor(ttlTracker);
+ hBaseLevelRowCompactor = new HBaseLevelRowCompactor(ttlTracker);
+ LOGGER.info(String.format("CompactionScanner params:- (" +
+ "physical-data-tablename = %s, compaction-tablename =
%s, " +
+ "emptyCF = %s, emptyCQ = %s, " +
+ "minVersion = %d, maxVersion = %d, keepDeletedCells =
%s, " +
+ "familyCount = %d, localIndex = %s, emptyCFStore = %s,
" +
+ "compactionTime = %d, maxLookbackWindowStart = %d,
maxLookbackInMillis = %d)",
+ table.getName().toString(), tableName,
+ Bytes.toString(this.emptyCF), Bytes.toString(emptyCQ),
+ this.minVersion, this.maxVersion, this.keepDeletedCells.name(),
+ this.familyCount, this.localIndex, this.emptyCFStore,
+ compactionTime, maxLookbackWindowStart, maxLookbackInMillis));
+
+ }
+
+ /**
+ * Helper method to create TTL tracker for various phoenix data model
objects
+ * i.e views, view indexes ...
+ * @param env
+ * @param store
+ * @param baseTable
+ * @return
+ */
+ private TTLTracker createTTLTrackerFor(RegionCoprocessorEnvironment env,
+ Store store, PTable baseTable) throws IOException {
+
+ long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+ String compactionTableName =
env.getRegion().getRegionInfo().getTable().getNameAsString();
Review Comment:
This code assumes that the physical table name is the same as the Phoenix
table name. This may not be true if the Phoenix table went through a
transformation due to an online data format change operation. We should check
if the Phoenix table exists with this name. If it does not exist then, we
should query syscat using the physical table column to find the actual name for
the Phoenix table.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]