This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 08d924787c PHOENIX-7382 Eliminating index building and treating max
lookback as … (#1964)
08d924787c is described below
commit 08d924787cf2aabb58a829f55ef4466ac4a0008d
Author: Kadir Ozdemir <[email protected]>
AuthorDate: Thu Sep 26 11:28:21 2024 -0700
PHOENIX-7382 Eliminating index building and treating max lookback as …
(#1964)
---
.../apache/phoenix/optimize/QueryOptimizer.java | 6 +
.../org/apache/phoenix/schema/MetaDataClient.java | 44 ++-
.../main/java/org/apache/phoenix/util/CDCUtil.java | 2 +-
.../java/org/apache/phoenix/util/QueryUtil.java | 2 +-
.../phoenix/coprocessor/CompactionScanner.java | 19 ++
.../coprocessor/GlobalIndexRegionScanner.java | 29 +-
.../coprocessor/IndexRebuildRegionScanner.java | 4 +
.../java/org/apache/phoenix/end2end/CDCBaseIT.java | 41 +--
.../apache/phoenix/end2end/CDCDefinitionIT.java | 10 +-
.../org/apache/phoenix/end2end/CDCQueryIT.java | 328 ++++++++++++++-------
.../java/org/apache/phoenix/util/TestUtil.java | 22 +-
11 files changed, 353 insertions(+), 154 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index 107eccac51..8f6faa653b 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -294,6 +294,12 @@ public class QueryOptimizer {
}
for (PTable index : indexes) {
+ if (CDCUtil.isCDCIndex(index) && !forCDC) {
+ // A CDC index is allowed only for the queries on its CDC
table because a CDC index
+ // may not be built completely and may not include the index
row updates for
+ // the data table mutations outside the max lookback window
+ continue;
+ }
QueryPlan plan = addPlan(statement, translatedIndexSelect, index,
targetColumns,
parallelIteratorFactory, dataPlan, false, indexResolver);
if (plan != null &&
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index dda82437ba..166caa6a66 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1075,7 +1075,8 @@ public class MetaDataClient {
TableName tableName = statement.getTableName();
Map<String,Object> tableProps =
Maps.newHashMapWithExpectedSize(statement.getProps().size());
Map<String,Object> commonFamilyProps =
Maps.newHashMapWithExpectedSize(statement.getProps().size() + 1);
- populatePropertyMaps(statement.getProps(), tableProps,
commonFamilyProps, statement.getTableType());
+ populatePropertyMaps(statement.getProps(), tableProps,
commonFamilyProps,
+ statement.getTableType(), false);
splits = processSplits(tableProps, splits);
boolean isAppendOnlySchema = false;
@@ -1231,13 +1232,14 @@ public class MetaDataClient {
* @throws SQLException
*/
private void populatePropertyMaps(ListMultimap<String,Pair<String,Object>>
statementProps, Map<String, Object> tableProps,
- Map<String, Object> commonFamilyProps, PTableType tableType)
throws SQLException {
+ Map<String, Object> commonFamilyProps, PTableType tableType,
boolean isCDCIndex) throws SQLException {
// Somewhat hacky way of determining if property is for
HColumnDescriptor or HTableDescriptor
ColumnFamilyDescriptor defaultDescriptor =
ColumnFamilyDescriptorBuilder.of(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES);
if (!statementProps.isEmpty()) {
Collection<Pair<String,Object>> propsList =
statementProps.get(QueryConstants.ALL_FAMILY_PROPERTIES_KEY);
for (Pair<String,Object> prop : propsList) {
- if (tableType == PTableType.INDEX &&
MetaDataUtil.propertyNotAllowedToBeOutOfSync(prop.getFirst())) {
+ if (tableType == PTableType.INDEX && !isCDCIndex &&
+
MetaDataUtil.propertyNotAllowedToBeOutOfSync(prop.getFirst())) {
throw new
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SET_OR_ALTER_PROPERTY_FOR_INDEX)
.setMessage("Property: " + prop.getFirst()).build()
.buildException();
@@ -1634,7 +1636,9 @@ public class MetaDataClient {
Map<String,Object> tableProps =
Maps.newHashMapWithExpectedSize(statement.getProps().size());
Map<String,Object> commonFamilyProps =
Maps.newHashMapWithExpectedSize(statement.getProps().size() + 1);
- populatePropertyMaps(statement.getProps(), tableProps,
commonFamilyProps, PTableType.INDEX);
+ populatePropertyMaps(statement.getProps(), tableProps,
commonFamilyProps, PTableType.INDEX,
+ CDCUtil.isCDCIndex(SchemaUtil
+
.getTableNameFromFullName(statement.getIndexTableName().toString())));
List<Pair<ParseNode, SortOrder>> indexParseNodeAndSortOrderList =
ik.getParseNodeAndSortOrderList();
List<ColumnName> includedColumns = statement.getIncludeColumns();
TableRef tableRef = null;
@@ -1963,16 +1967,23 @@ public class MetaDataClient {
statement.getProps().size());
Map<String, Object> commonFamilyProps =
Maps.newHashMapWithExpectedSize(
statement.getProps().size() + 1);
- populatePropertyMaps(statement.getProps(), tableProps,
commonFamilyProps, PTableType.CDC);
-
- PhoenixStatement pstmt = new PhoenixStatement(connection);
- String dataTableFullName =
SchemaUtil.getTableName(statement.getDataTable().getSchemaName(),
- statement.getDataTable().getTableName());
- String createIndexSql = "CREATE UNCOVERED INDEX " +
- (statement.isIfNotExists() ? "IF NOT EXISTS " : "") +
- "\"" +
CDCUtil.getCDCIndexName(statement.getCdcObjName().getName()) + "\"" +
- " ON " + dataTableFullName + " (" +
PhoenixRowTimestampFunction.NAME + "()) ASYNC";
+ populatePropertyMaps(statement.getProps(), tableProps,
commonFamilyProps, PTableType.CDC,
+ false);
+ Properties props = connection.getClientInfo();
+ props.put(INDEX_CREATE_DEFAULT_STATE, "ACTIVE");
+
+ String
+ dataTableFullName =
+
SchemaUtil.getTableName(statement.getDataTable().getSchemaName(),
+ statement.getDataTable().getTableName());
+ String
+ createIndexSql =
+ "CREATE UNCOVERED INDEX " + (statement.isIfNotExists() ? "IF
NOT EXISTS " : "")
+ +
CDCUtil.getCDCIndexName(statement.getCdcObjName().getName())
+ + " ON " + dataTableFullName + " ("
+ + PhoenixRowTimestampFunction.NAME + "()) ASYNC";
List<String> indexProps = new ArrayList<>();
+ indexProps.add("REPLICATION_SCOPE=0");
Object saltBucketNum = TableProperty.SALT_BUCKETS.getValue(tableProps);
if (saltBucketNum != null) {
indexProps.add("SALT_BUCKETS=" + saltBucketNum);
@@ -1982,9 +1993,10 @@ public class MetaDataClient {
indexProps.add("COLUMN_ENCODED_BYTES=" + columnEncodedBytes);
}
createIndexSql = createIndexSql + " " + String.join(", ", indexProps);
- try {
- pstmt.execute(createIndexSql);
- } catch (SQLException e) {
+ try (Connection internalConnection = QueryUtil.getConnection(props,
connection.getQueryServices().getConfiguration())) {
+ PhoenixStatement pstmt = new PhoenixStatement((PhoenixConnection)
internalConnection);
+ pstmt.execute(createIndexSql);
+ } catch (SQLException e) {
if (e.getErrorCode() == TABLE_ALREADY_EXIST.getErrorCode()) {
throw new
SQLExceptionInfo.Builder(TABLE_ALREADY_EXIST).setTableName(
statement.getCdcObjName().getName()).setRootCause(
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
index 96641c2dcf..6e87121ef9 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
@@ -38,7 +38,7 @@ import org.apache.phoenix.schema.types.PDataType;
import org.bson.RawBsonDocument;
public class CDCUtil {
- public static final String CDC_INDEX_PREFIX = "__CDC__";
+ public static final String CDC_INDEX_PREFIX = "PHOENIX_CDC_INDEX";
/**
* Make a set of CDC change scope enums from the given string containing
comma separated scope
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/QueryUtil.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/QueryUtil.java
index 52e4f6aa0e..6edb67af11 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/QueryUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/QueryUtil.java
@@ -424,7 +424,7 @@ public final class QueryUtil {
return getConnection(new Properties(), conf);
}
- private static Connection getConnection(Properties props, Configuration
conf)
+ public static Connection getConnection(Properties props, Configuration
conf)
throws SQLException {
String url = getConnectionUrl(props, conf);
LOGGER.info(String.format("Creating connection with the jdbc url: %s,
isServerSide = %s",
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
index e96406f142..126e212104 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
@@ -70,6 +70,7 @@ import org.apache.phoenix.schema.types.PSmallint;
import
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.coprocessorclient.RowKeyMatcher;
import org.apache.phoenix.coprocessorclient.TableTTLInfoCache;
@@ -155,6 +156,7 @@ public class CompactionScanner implements InternalScanner {
private long inputCellCount = 0;
private long outputCellCount = 0;
private boolean phoenixLevelOnly = false;
+ private boolean isCDCIndex;
// Only for forcing minor compaction while testing
private static boolean forceMinorCompaction = false;
@@ -197,6 +199,7 @@ public class CompactionScanner implements InternalScanner {
emptyCFStore = familyCount == 1 ||
columnFamilyName.equals(Bytes.toString(emptyCF))
|| localIndex;
+ isCDCIndex = table != null ? CDCUtil.isCDCIndex(table) : false;
// Initialize the tracker that computes the TTL for the compacting
table.
// The TTL tracker can be
// simple (one single TTL for the table) when the compacting table is
not Partitioned
@@ -2356,6 +2359,19 @@ public class CompactionScanner implements
InternalScanner {
}
}
+ /**
+ * For a CDC index, we retain all cells within the max lookback window
as opposed to
+ * retaining all row versions visible through max lookback window we
do for other tables
+ */
+ private boolean retainCellsForCDCIndex(List<Cell> result, List<Cell>
retainedCells) {
+ for (Cell cell : result) {
+ if (cell.getTimestamp() >=
rowTracker.getRowContext().getMaxLookbackWindowStart()) {
+ retainedCells.add(cell);
+ }
+ }
+ return true;
+ }
+
/**
* The retained cells includes the cells that are visible through the
max lookback
* window and the additional empty column cells that are needed to
reduce large time
@@ -2366,6 +2382,9 @@ public class CompactionScanner implements InternalScanner
{
lastRowVersion.clear();
emptyColumn.clear();
+ if (isCDCIndex) {
+ return retainCellsForCDCIndex(result, retainedCells);
+ }
getLastRowVersionInMaxLookbackWindow(result, lastRowVersion,
retainedCells,
emptyColumn);
if (lastRowVersion.isEmpty()) {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
index 1ee83005bb..ad853ef141 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
@@ -55,12 +55,15 @@ import
org.apache.phoenix.hbase.index.write.IndexWriterUtils;
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository;
import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.transform.TransformMaintainer;
import org.apache.phoenix.schema.types.PVarbinary;
import
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
@@ -70,6 +73,7 @@ import org.apache.phoenix.util.ClientUtil;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.ScanUtil;
@@ -77,6 +81,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -273,11 +278,31 @@ public abstract class GlobalIndexRegionScanner extends
BaseRegionScanner {
verificationResultRepository =
new
IndexVerificationResultRepository(indexMaintainer.getIndexTableName(),
hTableFactory);
nextStartKey = null;
- minTimestamp = scan.getTimeRange().getMin();
}
+ computeMinTimestamp(config);
}
-
+ /**
+ * For CDC indexes we do not need to consider rows outside max lookback
window or before
+ * the index create time. minTimestamp needs to be computed and used for
CDC indexes always
+ * even when it is not set on the scan
+ */
+ private void computeMinTimestamp(Configuration config) throws IOException {
+ minTimestamp = scan.getTimeRange().getMin();
+ if (indexMaintainer.isCDCIndex()) {
+ minTimestamp = EnvironmentEdgeManager.currentTimeMillis() -
maxLookBackInMills;
+ try (PhoenixConnection conn =
+
QueryUtil.getConnectionOnServer(config).unwrap(PhoenixConnection.class)) {
+ PTable indexTable =
conn.getTableNoCache(indexMaintainer.getLogicalIndexName());
+ minTimestamp = Math.max(indexTable.getTimeStamp() + 1,
minTimestamp);
+ } catch (SQLException e) {
+ LOGGER.error(
+ "Unable to get the PTable for the index table "
+ + indexMaintainer.getLogicalIndexName() + " "
+ e);
+ throw new IOException(e);
+ }
+ }
+ }
public static long getTimestamp(Mutation m) {
for (List<Cell> cells : m.getFamilyCellMap().values()) {
for (Cell cell : cells) {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 3de43baee2..8054ca053d 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -348,6 +348,10 @@ public class IndexRebuildRegionScanner extends
GlobalIndexRegionScanner {
Put put = null;
Delete del = null;
for (Cell cell : row) {
+ if (cell.getTimestamp() < minTimestamp
+ && indexMaintainer.isCDCIndex()) {
+ continue;
+ }
if (cell.getType().equals(Cell.Type.Put)) {
if (familyMap != null &&
!isColumnIncluded(cell)) {
continue;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
index c1ef1c399f..7e6e82ef62 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
@@ -167,20 +167,14 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
conn.createStatement().execute(table_sql);
}
- protected void createCDCAndWait(Connection conn, String tableName, String
cdcName,
- String cdc_sql) throws Exception {
- createCDCAndWait(conn, tableName, cdcName, cdc_sql, null, null);
+ protected void createCDC(Connection conn, String cdc_sql) throws Exception
{
+ createCDC(conn, cdc_sql, null, null);
}
- protected void createCDCAndWait(Connection conn, String tableName, String
cdcName,
- String cdc_sql,
PTable.QualifierEncodingScheme encodingScheme,
- Integer nSaltBuckets) throws Exception {
+ protected void createCDC(Connection conn, String cdc_sql,
+ PTable.QualifierEncodingScheme encodingScheme, Integer
nSaltBuckets) throws Exception {
// For CDC, multitenancy gets derived automatically via the parent
table.
createTable(conn, cdc_sql, encodingScheme, false, nSaltBuckets, false,
null);
- String schemaName = SchemaUtil.getSchemaNameFromFullName(tableName);
- tableName = SchemaUtil.getTableNameFromFullName(tableName);
- IndexToolIT.runIndexTool(false, schemaName, tableName,
- "\"" + CDCUtil.getCDCIndexName(cdcName) + "\"");
}
protected void assertCDCState(Connection conn, String cdcName, String
expInclude,
@@ -419,16 +413,22 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
// - with a null
// - missing columns
protected List<ChangeRow> generateChanges(long startTS, String[]
tenantids, String tableName,
- String datatableNameForDDL,
CommitAdapter committer)
+ String datatableNameForDDL, CommitAdapter committer)
throws Exception {
+ return generateChanges(startTS, tenantids, tableName,
datatableNameForDDL, committer,
+ "v3", 0);
+ }
+ protected List<ChangeRow> generateChanges(long startTS, String[]
tenantids, String tableName,
+ String datatableNameForDDL, CommitAdapter committer, String
columnToDrop, int startKey)
+ throws Exception {
List<ChangeRow> changes = new ArrayList<>();
EnvironmentEdgeManager.injectEdge(injectEdge);
injectEdge.setValue(startTS);
- boolean dropV3Done = false;
+ boolean dropColumnDone = false;
committer.init();
- Map<String, Object> rowid1 = new HashMap() {{ put("K", 1); }};
- Map<String, Object> rowid2 = new HashMap() {{ put("K", 2); }};
- Map<String, Object> rowid3 = new HashMap() {{ put("K", 3); }};
+ Map<String, Object> rowid1 = new HashMap() {{ put("K", startKey + 1);
}};
+ Map<String, Object> rowid2 = new HashMap() {{ put("K", startKey + 2);
}};
+ Map<String, Object> rowid3 = new HashMap() {{ put("K", startKey + 3);
}};
for (String tid: tenantids) {
try (Connection conn = committer.getConnection(tid)) {
changes.add(addChange(conn, tableName,
@@ -445,7 +445,6 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
}})
));
committer.commit(conn);
-
changes.add(addChange(conn, tableName, new ChangeRow(tid,
startTS += 100,
rowid3, new TreeMap<String, Object>() {{
put("V1", 300L);
@@ -462,12 +461,12 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
));
committer.commit(conn);
}
- if (datatableNameForDDL != null && !dropV3Done) {
+ if (datatableNameForDDL != null && !dropColumnDone && columnToDrop
!= null) {
try (Connection conn = newConnection()) {
conn.createStatement().execute("ALTER TABLE " +
datatableNameForDDL +
" DROP COLUMN v3");
}
- dropV3Done = true;
+ dropColumnDone = true;
}
try (Connection conn = newConnection(tid)) {
changes.add(addChange(conn, tableName, new ChangeRow(tid,
startTS += 100, rowid1,
@@ -518,7 +517,6 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
}})
));
committer.commit(conn);
-
changes.add(addChange(conn, tableName, new ChangeRow(tid,
startTS += 100, rowid1,
null)));
committer.commit(conn);
@@ -528,6 +526,7 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
return changes;
}
+
protected void verifyChangesViaSCN(String tenantId, ResultSet rs, String
dataTableName,
Map<String, String> dataColumns,
List<ChangeRow> changes,
Set<PTable.CDCChangeScope>
changeScopes) throws Exception {
@@ -746,6 +745,10 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
return change == null ? CDC_DELETE_EVENT_TYPE :
CDC_UPSERT_EVENT_TYPE;
}
+ public long getTimestamp() {
+ return changeTS;
+ }
+
ChangeRow(String tenantid, long changeTS, Map<String, Object> pks,
Map<String, Object> change) {
this.tenantId = tenantid;
this.changeTS = changeTS;
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
index 29f6e41d0a..21dae3bf70 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
@@ -87,7 +87,7 @@ public class CDCDefinitionIT extends CDCBaseIT {
}
cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
- createCDCAndWait(conn, tableName, cdcName, cdc_sql, null, null);
+ createCDC(conn, cdc_sql, null, null);
assertCDCState(conn, cdcName, null, 3);
assertNoResults(conn, cdcName);
@@ -104,7 +104,7 @@ public class CDCDefinitionIT extends CDCBaseIT {
cdcName = generateUniqueName();
cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + " INCLUDE
(pre, post)";
- createCDCAndWait(conn, tableName, cdcName, cdc_sql);
+ createCDC(conn, cdc_sql);
assertCDCState(conn, cdcName, PRE+","+POST, 3);
assertPTable(cdcName, new HashSet<>(
Arrays.asList(PRE, POST)), tableName, datatableName);
@@ -140,7 +140,7 @@ public class CDCDefinitionIT extends CDCBaseIT {
String cdcName = generateUniqueName();
String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
- createCDCAndWait(conn, tableName, cdcName, cdc_sql, null,
+ createCDC(conn, cdc_sql, null,
saltingConfig[1]);
try {
assertCDCState(conn, cdcName, null, 3);
@@ -186,7 +186,7 @@ public class CDCDefinitionIT extends CDCBaseIT {
}
cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
- createCDCAndWait(conn, tableName, cdcName, cdc_sql);
+ createCDC(conn, cdc_sql);
assertCDCState(conn, cdcName, null, 3);
assertPTable(cdcName, null, tableName, datatableName);
}
@@ -306,7 +306,7 @@ public class CDCDefinitionIT extends CDCBaseIT {
}
String cdcName = generateUniqueName();
String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
- createCDCAndWait(conn, tableName, cdcName, cdc_sql);
+ createCDC(conn, cdc_sql);
try {
conn.createStatement().executeQuery("SELECT " +
"/*+ CDC_INCLUDE(DUMMY) */ * FROM " + cdcName);
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
index 8f675d9d90..a1b99e90b2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
@@ -18,13 +18,21 @@
package org.apache.phoenix.end2end;
import org.apache.hadoop.hbase.TableName;
-import org.apache.phoenix.end2end.index.SingleCellIndexIT;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ManualEnvironmentEdge;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -36,7 +44,6 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.sql.Statement;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
@@ -50,8 +57,14 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
+import static
org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT;
+import static
org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT;
+import static
org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT;
+import static
org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT;
+import static
org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_OLD_INDEX_ROW_COUNT;
+import static
org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT;
+import static
org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT;
import static org.apache.phoenix.query.QueryConstants.CDC_EVENT_TYPE;
-import static org.apache.phoenix.query.QueryConstants.CDC_JSON_COL_NAME;
import static
org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
import static
org.apache.phoenix.schema.PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS;
import static org.junit.Assert.assertEquals;
@@ -65,25 +78,24 @@ import static org.junit.Assert.assertTrue;
// "".isEmpty();
// }
@RunWith(Parameterized.class)
-@Category(ParallelStatsDisabledTest.class)
+@Category(NeedsOwnMiniClusterTest.class)
public class CDCQueryIT extends CDCBaseIT {
private static final Logger LOGGER =
LoggerFactory.getLogger(CDCQueryIT.class);
+ private static final int MAX_LOOKBACK_AGE = 10; // seconds
// Offset of the first column, depending on whether
PHOENIX_ROW_TIMESTAMP() is in the schema
// or not.
private final boolean forView;
- private final boolean dataBeforeCDC;
private final PTable.QualifierEncodingScheme encodingScheme;
private final boolean multitenant;
private final Integer indexSaltBuckets;
private final Integer tableSaltBuckets;
private final boolean withSchemaName;
- public CDCQueryIT(Boolean forView, Boolean dataBeforeCDC,
+ public CDCQueryIT(Boolean forView,
PTable.QualifierEncodingScheme encodingScheme, boolean
multitenant,
Integer indexSaltBuckets, Integer tableSaltBuckets,
boolean withSchemaName) {
this.forView = forView;
- this.dataBeforeCDC = dataBeforeCDC;
this.encodingScheme = encodingScheme;
this.multitenant = multitenant;
this.indexSaltBuckets = indexSaltBuckets;
@@ -91,26 +103,27 @@ public class CDCQueryIT extends CDCBaseIT {
this.withSchemaName = withSchemaName;
}
- @Parameterized.Parameters(name = "forView={0} dataBeforeCDC={1},
encodingScheme={2}, " +
- "multitenant={3}, indexSaltBuckets={4}, tableSaltBuckets={5}
withSchemaName=${6}")
+ @Parameterized.Parameters(name = "forView={0}, encodingScheme={1}, " +
+ "multitenant={2}, indexSaltBuckets={3}, tableSaltBuckets={4}
withSchemaName=${5}")
public static synchronized Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
- { Boolean.FALSE, Boolean.FALSE, TWO_BYTE_QUALIFIERS,
Boolean.FALSE, null, null,
- Boolean.FALSE },
- { Boolean.FALSE, Boolean.TRUE, TWO_BYTE_QUALIFIERS,
Boolean.FALSE, null, null,
- Boolean.TRUE },
- { Boolean.FALSE, Boolean.FALSE, NON_ENCODED_QUALIFIERS,
Boolean.FALSE, 1, 1,
- Boolean.FALSE },
+ { Boolean.FALSE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null,
null, Boolean.FALSE },
+ { Boolean.FALSE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null,
null, Boolean.TRUE },
+ { Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.FALSE, 1, 1,
Boolean.FALSE },
// Once PHOENIX-7239, change this to have different salt
buckets for data and index.
- { Boolean.FALSE, Boolean.FALSE, NON_ENCODED_QUALIFIERS,
Boolean.TRUE, 1, 1,
- Boolean.TRUE },
- { Boolean.FALSE, Boolean.FALSE, NON_ENCODED_QUALIFIERS,
Boolean.FALSE, 4, null,
- Boolean.FALSE },
- { Boolean.TRUE, Boolean.FALSE, TWO_BYTE_QUALIFIERS,
Boolean.FALSE, null, null,
- Boolean.FALSE },
+ { Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.TRUE, 1, 1,
Boolean.TRUE },
+ { Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.FALSE, 4,
null, Boolean.FALSE },
+ { Boolean.TRUE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null,
null, Boolean.FALSE },
});
}
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+ Integer.toString(MAX_LOOKBACK_AGE));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
@Before
public void beforeTest(){
EnvironmentEdgeManager.reset();
@@ -118,6 +131,13 @@ public class CDCQueryIT extends CDCBaseIT {
injectEdge.setValue(EnvironmentEdgeManager.currentTimeMillis());
}
+ private void cdcIndexShouldNotBeUsedForDataTableQueries(Connection conn,
String dataTableName,
+ String cdcName) throws Exception {
+ ResultSet rs = conn.createStatement().executeQuery("EXPLAIN SELECT *
FROM " + dataTableName
+ + " WHERE PHOENIX_ROW_TIMESTAMP() < CURRENT_TIME()");
+ String explainPlan = QueryUtil.getExplainPlan(rs);
+ assertFalse(explainPlan.contains(cdcName));
+ }
@Test
public void testSelectCDC() throws Exception {
String cdcName, cdc_sql;
@@ -138,10 +158,8 @@ public class CDCQueryIT extends CDCBaseIT {
}
cdcName = generateUniqueName();
cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
- if (!dataBeforeCDC) {
- createCDCAndWait(conn, tableName, cdcName, cdc_sql,
encodingScheme,
+ createCDC(conn, cdc_sql, encodingScheme,
indexSaltBuckets);
- }
}
String tenantId = multitenant ? "1000" : null;
@@ -154,17 +172,6 @@ public class CDCQueryIT extends CDCBaseIT {
List<ChangeRow> changes = generateChanges(startTS, tenantids,
tableName, null,
COMMIT_SUCCESS);
- if (dataBeforeCDC) {
- try (Connection conn = newConnection()) {
- createCDCAndWait(conn, tableName, cdcName, cdc_sql,
encodingScheme,
- indexSaltBuckets);
- }
- // Testing with flushed data adds more coverage.
- getUtility().getAdmin().flush(TableName.valueOf(datatableName));
-
getUtility().getAdmin().flush(TableName.valueOf(SchemaUtil.getTableName(schemaName,
- CDCUtil.getCDCIndexName(cdcName))));
- }
-
//SingleCellIndexIT.dumpTable(tableName);
//SingleCellIndexIT.dumpTable(CDCUtil.getCDCIndexName(cdcName));
@@ -234,6 +241,7 @@ public class CDCQueryIT extends CDCBaseIT {
assertEquals(false, rs.next());
}
}
+ cdcIndexShouldNotBeUsedForDataTableQueries(conn, tableName,
cdcName);
}
}
@@ -271,10 +279,7 @@ public class CDCQueryIT extends CDCBaseIT {
}
cdcName = generateUniqueName();
cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + " INCLUDE
(change)";
- if (!dataBeforeCDC) {
- createCDCAndWait(conn, tableName, cdcName, cdc_sql,
encodingScheme,
- indexSaltBuckets);
- }
+ createCDC(conn, cdc_sql, encodingScheme, indexSaltBuckets);
}
String tenantId = multitenant ? "1000" : null;
@@ -307,17 +312,6 @@ public class CDCQueryIT extends CDCBaseIT {
//SingleCellIndexIT.dumpTable(CDCUtil.getCDCIndexName(cdcName));
//LOGGER.debug("----------");
- if (dataBeforeCDC) {
- try (Connection conn = newConnection()) {
- createCDCAndWait(conn, tableName, cdcName, cdc_sql,
encodingScheme,
- indexSaltBuckets);
- }
- // Testing with flushed data adds more coverage.
- getUtility().getAdmin().flush(TableName.valueOf(datatableName));
-
getUtility().getAdmin().flush(TableName.valueOf(SchemaUtil.getTableName(schemaName,
- CDCUtil.getCDCIndexName(cdcName))));
- }
-
String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
try (Connection conn = newConnection(tenantId)) {
// For debug: uncomment to see the exact results logged to console.
@@ -351,6 +345,7 @@ public class CDCQueryIT extends CDCBaseIT {
verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery(
"SELECT /*+ CDC_INCLUDE(CHANGE, PRE, POST) */ *
FROM " + cdcFullName),
datatableName, dataColumns, changes, ALL_IMG);
+ cdcIndexShouldNotBeUsedForDataTableQueries(conn, tableName,
cdcName);
}
}
@@ -386,10 +381,8 @@ public class CDCQueryIT extends CDCBaseIT {
}
cdcName = generateUniqueName();
cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
- if (!dataBeforeCDC) {
- createCDCAndWait(conn, tableName, cdcName, cdc_sql,
encodingScheme,
- indexSaltBuckets);
- }
+
+ createCDC(conn, cdc_sql, encodingScheme, indexSaltBuckets);
}
String tenantId = multitenant ? "1000" : null;
@@ -402,17 +395,6 @@ public class CDCQueryIT extends CDCBaseIT {
List<ChangeRow> changes = generateChangesImmutableTable(startTS,
tenantids, tableName,
COMMIT_SUCCESS);
- if (dataBeforeCDC) {
- try (Connection conn = newConnection()) {
- createCDCAndWait(conn, tableName, cdcName, cdc_sql,
encodingScheme,
- indexSaltBuckets);
- }
- // Testing with flushed data adds more coverage.
- getUtility().getAdmin().flush(TableName.valueOf(datatableName));
-
getUtility().getAdmin().flush(TableName.valueOf(SchemaUtil.getTableName(schemaName,
- CDCUtil.getCDCIndexName(cdcName))));
- }
-
String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
Map<String, String> dataColumns = new TreeMap<String, String>() {{
put("V1", "INTEGER");
@@ -439,6 +421,7 @@ public class CDCQueryIT extends CDCBaseIT {
verifyChangesViaSCN(tenantId,
conn.createStatement().executeQuery("SELECT /*+ CDC_INCLUDE(CHANGE) */ " +
"PHOENIX_ROW_TIMESTAMP(), K, \"CDC JSON\" FROM " +
cdcFullName),
datatableName, dataColumns, changes, CHANGE_IMG);
+ cdcIndexShouldNotBeUsedForDataTableQueries(conn, tableName,
cdcName);
}
}
@@ -471,10 +454,8 @@ public class CDCQueryIT extends CDCBaseIT {
}
cdcName = generateUniqueName();
cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
- if (!dataBeforeCDC) {
- createCDCAndWait(conn, tableName, cdcName, cdc_sql,
encodingScheme,
- indexSaltBuckets);
- }
+ createCDC(conn, cdc_sql, encodingScheme, indexSaltBuckets);
+ cdcIndexShouldNotBeUsedForDataTableQueries(conn,
tableName,cdcName);
}
EnvironmentEdgeManager.injectEdge(injectEdge);
@@ -540,13 +521,6 @@ public class CDCQueryIT extends CDCBaseIT {
Timestamp ts4 = new Timestamp(cal.getTime().getTime());
EnvironmentEdgeManager.reset();
- if (dataBeforeCDC) {
- try (Connection conn = newConnection()) {
- createCDCAndWait(conn, tableName, cdcName, cdc_sql,
encodingScheme,
- indexSaltBuckets);
- }
- }
-
//SingleCellIndexIT.dumpTable(CDCUtil.getCDCIndexName(cdcName));
String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
@@ -623,10 +597,7 @@ public class CDCQueryIT extends CDCBaseIT {
cdcName = generateUniqueName();
cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
- if (!dataBeforeCDC) {
- createCDCAndWait(conn, tableName, cdcName, cdc_sql,
encodingScheme,
- indexSaltBuckets);
- }
+ createCDC(conn, cdc_sql, encodingScheme, indexSaltBuckets);
conn.createStatement().execute("ALTER TABLE " + datatableName + "
DROP COLUMN v0");
}
@@ -640,17 +611,6 @@ public class CDCQueryIT extends CDCBaseIT {
List<ChangeRow> changes = generateChanges(startTS, tenantids,
tableName, datatableName,
COMMIT_SUCCESS);
- if (dataBeforeCDC) {
- try (Connection conn = newConnection()) {
- createCDCAndWait(conn, tableName, cdcName, cdc_sql,
encodingScheme,
- indexSaltBuckets);
- }
- // Testing with flushed data adds more coverage.
- getUtility().getAdmin().flush(TableName.valueOf(datatableName));
-
getUtility().getAdmin().flush(TableName.valueOf(SchemaUtil.getTableName(schemaName,
- CDCUtil.getCDCIndexName(cdcName))));
- }
-
Map<String, String> dataColumns = new TreeMap<String, String>() {{
put("V0", "INTEGER");
put("V1", "INTEGER");
@@ -664,16 +624,12 @@ public class CDCQueryIT extends CDCBaseIT {
"SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " +
SchemaUtil.getTableName(
schemaName, cdcName)),
datatableName, dataColumns, changes, CHANGE_IMG);
+ cdcIndexShouldNotBeUsedForDataTableQueries(conn, tableName,
cdcName);
}
}
@Test
public void testSelectCDCFailDataTableUpdate() throws Exception {
- if (dataBeforeCDC == true) {
- // In this case, index will not exist at the time of upsert, so we
can't simulate the
- // index failure.
- return;
- }
String schemaName = withSchemaName ? generateUniqueName() : null;
String tableName = SchemaUtil.getTableName(schemaName,
generateUniqueName());
String cdcName, cdc_sql;
@@ -692,7 +648,8 @@ public class CDCQueryIT extends CDCBaseIT {
}
cdcName = generateUniqueName();
cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
- createCDCAndWait(conn, tableName, cdcName, cdc_sql,
encodingScheme, indexSaltBuckets);
+ createCDC(conn, cdc_sql, encodingScheme, indexSaltBuckets);
+ cdcIndexShouldNotBeUsedForDataTableQueries(conn, tableName,
cdcName);
}
String tenantId = multitenant ? "1000" : null;
@@ -702,13 +659,186 @@ public class CDCQueryIT extends CDCBaseIT {
}
long startTS = System.currentTimeMillis();
- generateChanges(startTS, tenantids, tableName, null,
- COMMIT_FAILURE_EXPECTED);
+ generateChanges(startTS, tenantids, tableName, null,
COMMIT_FAILURE_EXPECTED);
try (Connection conn = newConnection(tenantId)) {
ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM
" +
SchemaUtil.getTableName(schemaName, cdcName));
assertEquals(false, rs.next());
+
+ }
+ }
+
+ @Test
+ public void testCDCIndexBuildAndVerification() throws Exception {
+ String schemaName = withSchemaName ? generateUniqueName() : null;
+ String tableName = generateUniqueName();
+ String tableFullName = SchemaUtil.getTableName(schemaName, tableName);
+ String cdcName, cdc_sql;
+ try (Connection conn = newConnection()) {
+ // Create a table and add some rows
+ createTable(conn, "CREATE TABLE " + tableFullName + " (" +
(multitenant ?
+ "TENANT_ID CHAR(5) NOT NULL, " :
+ "")
+ + "k INTEGER NOT NULL, v1 INTEGER, v1v2 INTEGER, v2
INTEGER, B.vb INTEGER, "
+ + "v3 INTEGER, CONSTRAINT PK PRIMARY KEY " + (multitenant ?
+ "(TENANT_ID, k) " :
+ "(k)") + ")", encodingScheme, multitenant,
tableSaltBuckets, false, null);
+ if (forView) {
+ String viewName = generateUniqueName();
+ String viewFullName = SchemaUtil.getTableName(schemaName,
viewName);
+ createTable(conn, "CREATE VIEW " + viewFullName + " AS SELECT
* FROM " + tableFullName,
+ encodingScheme);
+ tableName = viewName;
+ tableFullName = viewFullName;
+ }
+
+ String tenantId = multitenant ? "1000" : null;
+ String[] tenantids = { tenantId };
+ if (multitenant) {
+ tenantids = new String[] { tenantId, "2000" };
+ }
+
+ long startTS = System.currentTimeMillis();
+ List<ChangeRow> changes = generateChanges(startTS, tenantids,
tableFullName,
+ tableFullName, COMMIT_SUCCESS, null, 0);
+ // Make sure the timestamp of the mutations are not in the future
+ long currentTime = System.currentTimeMillis();
+ long nextTime = changes.get(changes.size() - 1).getTimestamp() + 1;
+ if (nextTime > currentTime) {
+ Thread.sleep(nextTime - currentTime);
+ }
+ // Create a CDC table
+ cdcName = generateUniqueName();
+ cdc_sql = "CREATE CDC " + cdcName + " ON " + tableFullName;
+ createCDC(conn, cdc_sql, encodingScheme, indexSaltBuckets);
+ // Check CDC index is active but empty
+ String indexTableFullName = SchemaUtil.getTableName(schemaName,
+ CDCUtil.getCDCIndexName(cdcName));
+ PTable indexTable = ((PhoenixConnection)
conn).getTableNoCache(indexTableFullName);
+ assertEquals(indexTable.getIndexState(), PIndexState.ACTIVE);
+ TestUtil.assertRawRowCount(conn,
+
TableName.valueOf(indexTable.getPhysicalName().getString()),0);
+ // Rebuild the index and verify that it is still empty
+ IndexToolIT.runIndexTool(false, schemaName, tableName,
+ CDCUtil.getCDCIndexName(cdcName));
+ TestUtil.assertRawRowCount(conn,
+
TableName.valueOf(indexTable.getPhysicalName().getString()),0);
+ // Add more rows
+ startTS = System.currentTimeMillis();
+ changes = generateChanges(startTS, tenantids, tableFullName,
+ tableFullName, COMMIT_SUCCESS, null, 1);
+ currentTime = System.currentTimeMillis();
+ // Advance time by the max lookback age. This will cause all rows
to expire
+ nextTime = changes.get(changes.size() - 1).getTimestamp() + 1;
+ if (nextTime > currentTime) {
+ Thread.sleep(nextTime - currentTime);
+ }
+ // Verify CDC index verification pass
+ IndexTool indexTool = IndexToolIT.runIndexTool(false, schemaName,
tableName,
+ CDCUtil.getCDCIndexName(cdcName), null, 0,
IndexTool.IndexVerifyType.ONLY);
+ assertEquals(0,
indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
+ assertEquals(0,
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(0,
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+ assertEquals(0,
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
+ assertEquals(0,
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(0,
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue());
+ assertEquals(0,
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT).getValue());
+
+ }
+ }
+
+ @Test
+ public void testCDCIndexTTLEqualsToMaxLookbackAge() throws Exception {
+ if (forView) {
+ // Except for views
+ return;
+ }
+ String schemaName = withSchemaName ? generateUniqueName() : null;
+ String tableName = generateUniqueName();
+ String tableFullName = SchemaUtil.getTableName(schemaName, tableName);
+ String cdcName, cdc_sql;
+ try (Connection conn = newConnection()) {
+ // Create a table
+ createTable(conn, "CREATE TABLE " + tableFullName + " (" +
(multitenant ?
+ "TENANT_ID CHAR(5) NOT NULL, " :
+ "")
+ + "k INTEGER NOT NULL, v1 INTEGER, v1v2 INTEGER, v2
INTEGER, B.vb INTEGER, "
+ + "v3 INTEGER, CONSTRAINT PK PRIMARY KEY " + (multitenant ?
+ "(TENANT_ID, k) " :
+ "(k)") + ")", encodingScheme, multitenant,
tableSaltBuckets, false, null);
+ if (forView) {
+ String viewName = generateUniqueName();
+ String viewFullName = SchemaUtil.getTableName(schemaName,
viewName);
+ createTable(conn, "CREATE VIEW " + viewFullName + " AS SELECT
* FROM " + tableFullName,
+ encodingScheme);
+ tableName = viewName;
+ tableFullName = viewFullName;
+ }
+
+ String tenantId = multitenant ? "1000" : null;
+ String[] tenantids = { tenantId };
+ if (multitenant) {
+ tenantids = new String[] { tenantId, "2000" };
+ }
+
+ // Create a CDC table
+ cdcName = generateUniqueName();
+ cdc_sql = "CREATE CDC " + cdcName + " ON " + tableFullName;
+ createCDC(conn, cdc_sql, encodingScheme, indexSaltBuckets);
+ // Add rows
+ long startTS = System.currentTimeMillis();
+ List<ChangeRow> changes = generateChanges(startTS, tenantids,
tableFullName,
+ tableFullName, COMMIT_SUCCESS, null, 0);
+ String indexTableFullName = SchemaUtil.getTableName(schemaName,
+ CDCUtil.getCDCIndexName(cdcName));
+ PTable indexTable = ((PhoenixConnection)
conn).getTableNoCache(indexTableFullName);
+ String indexTablePhysicalName =
indexTable.getPhysicalName().toString();
+ int expectedRawRowCount = TestUtil.getRawRowCount(conn,
+ TableName.valueOf(indexTablePhysicalName));
+ long currentTime = System.currentTimeMillis();
+ // Advance time by the max lookback age. This will cause all rows
to expire
+ long nextTime = changes.get(changes.size() - 1).getTimestamp()
+ + MAX_LOOKBACK_AGE * 1000 + 1;
+ if (nextTime > currentTime) {
+ Thread.sleep(nextTime - currentTime);
+ }
+ // Major compact the CDC index. This will remove all expired rows
+ TestUtil.doMajorCompaction(conn, indexTablePhysicalName);
+ // Check CDC index is empty
+ TestUtil.assertRawRowCount(conn,
TableName.valueOf(indexTablePhysicalName),0);
+ // Rebuild the index and verify that it is still empty
+ IndexToolIT.runIndexTool(false, schemaName, tableName,
+ CDCUtil.getCDCIndexName(cdcName));
+ TestUtil.assertRawRowCount(conn,
TableName.valueOf(indexTablePhysicalName),0);
+ // This time we test we only keep the row versions within the max
lookback window
+ startTS = System.currentTimeMillis();
+ // Add the first set of rows
+ changes = generateChanges(startTS, tenantids, tableFullName,
+ tableFullName, COMMIT_SUCCESS, null, 0);
+ // Advance time by the max lookback age. This will cause the first
set of rows to expire
+ startTS = changes.get(changes.size() - 1).getTimestamp()
+ + MAX_LOOKBACK_AGE * 1000 + 1;
+ // Add another set of changes
+ changes = generateChanges(startTS, tenantids, tableFullName,
+ tableFullName, COMMIT_SUCCESS, null, 10);
+ nextTime = changes.get(changes.size() - 1).getTimestamp() + 1;
+ // Major compact the CDC index which remove all expired rows which
is
+ // the first set of rows
+ currentTime = System.currentTimeMillis();
+ if (nextTime > currentTime) {
+ Thread.sleep(nextTime - currentTime);
+ }
+ TestUtil.doMajorCompaction(conn, indexTablePhysicalName);
+ // Check the CDC index has the first set of rows
+ TestUtil.assertRawRowCount(conn,
TableName.valueOf(indexTablePhysicalName),
+ expectedRawRowCount);
+ // Rebuild the index and verify that it still have the same number
of rows
+ IndexToolIT.runIndexTool(false, schemaName, tableName,
+ CDCUtil.getCDCIndexName(cdcName));
+ TestUtil.assertRawRowCount(conn,
TableName.valueOf(indexTablePhysicalName),
+ expectedRawRowCount);
+
}
}
}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 1845f01a62..6ea2a2eb65 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -954,11 +954,12 @@ public class TestUtil {
int cellCount = 0;
int rowCount = 0;
try (ResultScanner scanner = table.getScanner(s)) {
- Result result = null;
+ Result result;
while ((result = scanner.next()) != null) {
rowCount++;
+ System.out.println("Row count: " + rowCount);
CellScanner cellScanner = result.cellScanner();
- Cell current = null;
+ Cell current;
while (cellScanner.advance()) {
current = cellScanner.current();
System.out.println(current + " column= " +
@@ -972,24 +973,17 @@ public class TestUtil {
}
public static int getRawRowCount(Table table) throws IOException {
+ dumpTable(table);
return getRowCount(table, true);
}
public static int getRowCount(Table table, boolean isRaw) throws
IOException {
Scan s = new Scan();
s.setRaw(isRaw);
- ;
- s.readAllVersions();
int rows = 0;
try (ResultScanner scanner = table.getScanner(s)) {
- Result result = null;
- while ((result = scanner.next()) != null) {
+ while (scanner.next() != null) {
rows++;
- CellScanner cellScanner = result.cellScanner();
- Cell current = null;
- while (cellScanner.advance()) {
- current = cellScanner.current();
- }
}
}
return rows;
@@ -1390,6 +1384,12 @@ public class TestUtil {
assertEquals(expectedRowCount, count);
}
+ public static int getRawRowCount(Connection conn, TableName table)
+ throws SQLException, IOException {
+ ConnectionQueryServices cqs =
conn.unwrap(PhoenixConnection.class).getQueryServices();
+ return TestUtil.getRawRowCount(cqs.getTable(table.getName()));
+ }
+
public static int getRawCellCount(Connection conn, TableName tableName,
byte[] row)
throws SQLException, IOException {
ConnectionQueryServices cqs =
conn.unwrap(PhoenixConnection.class).getQueryServices();