This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new f3eb74473a PHOENIX-7409: Fix the bug that is causing incorrect salt
bucket number to be used for mutations generated in CDC index. (#1989)
f3eb74473a is described below
commit f3eb74473afd44a54d8aa102c874eeb460199de3
Author: Hari Krishna Dara <[email protected]>
AuthorDate: Fri Oct 4 19:28:13 2024 +0530
PHOENIX-7409: Fix the bug that is causing incorrect salt bucket number to
be used for mutations generated in CDC index. (#1989)
- Revamped the time range test and included the coverage for special CDC DF
markers. Also refactored some debug code.
- Fix issues with debug logging code
- Uncomment debug code to help diagnose the occasional flappers we are
seeing
---
.../org/apache/phoenix/execute/MutationState.java | 2 +-
.../org/apache/phoenix/index/CDCTableInfo.java | 1 -
.../coprocessor/GlobalIndexRegionScanner.java | 4 -
.../phoenix/hbase/index/IndexRegionObserver.java | 3 +-
.../java/org/apache/phoenix/end2end/CDCBaseIT.java | 144 +++++++++++--
.../org/apache/phoenix/end2end/CDCQueryIT.java | 234 ++++++---------------
6 files changed, 197 insertions(+), 191 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
index e5a0c3ede2..419b273c72 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -681,7 +681,7 @@ public class MutationState implements SQLCloseable {
};
ImmutableBytesPtr key = new
ImmutableBytesPtr(maintainer.buildRowKey(
getter, ptr, null, null, mutationTimestamp));
- PRow row = table.newRow(
+ PRow row = index.newRow(
connection.getKeyValueBuilder(), mutationTimestamp,
key, false);
row.delete();
indexMutations.addAll(row.toRowMutations());
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/index/CDCTableInfo.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/index/CDCTableInfo.java
index 4d80f3e9d2..02fe008ab3 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/index/CDCTableInfo.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/index/CDCTableInfo.java
@@ -164,7 +164,6 @@ public class CDCTableInfo {
if (cdcDataTableRef.getTable().isImmutableRows() &&
cdcDataTableRef.getTable().getImmutableStorageScheme() ==
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
-
List<ColumnRef> dataColumns = new ArrayList<ColumnRef>();
PTable table = cdcDataTableRef.getTable();
for (PColumn column : table.getColumns()) {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
index ad853ef141..f5ef7a87dc 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
@@ -1380,10 +1380,6 @@ public abstract class GlobalIndexRegionScanner extends
BaseRegionScanner {
// CDC Index needs two delete markers one for
deleting the index row,
// and the other for referencing the data table
delete mutation with
// the right index row key, that is, the index row
key starting with ts
- Put cdcDataRowState = new
Put(currentDataRowState.getRow());
-
cdcDataRowState.addColumn(indexMaintainer.getDataEmptyKeyValueCF(),
-
indexMaintainer.getEmptyKeyValueQualifierForDataTable(), ts,
- ByteUtil.EMPTY_BYTE_ARRAY);
indexMutations.add(IndexRegionObserver.getDeleteIndexMutation(
currentDataRowState, indexMaintainer, ts,
rowKeyPtr));
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 8be7419780..cf612f4600 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -1038,7 +1038,6 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
context.indexUpdates.put(hTableInterfaceReference,
new Pair<Mutation,
byte[]>(getDeleteIndexMutation(cdcDataRowState,
indexMaintainer, ts, rowKeyPtr),
rowKeyPtr.get()));
-
}
}
}
@@ -2004,4 +2003,4 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
public static boolean isAtomicOperationComplete(OperationStatus status) {
return status.getOperationStatusCode() == SUCCESS &&
status.getResult() != null;
}
-}
\ No newline at end of file
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
index 7e6e82ef62..508f6fced0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
@@ -27,7 +27,9 @@ import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.index.SingleCellIndexIT;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PTable;
@@ -65,11 +67,14 @@ import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.stream.Collectors.*;
import static org.apache.phoenix.query.QueryConstants.CDC_CHANGE_IMAGE;
import static org.apache.phoenix.query.QueryConstants.CDC_DELETE_EVENT_TYPE;
+import static org.apache.phoenix.query.QueryConstants.CDC_EVENT_TYPE;
+import static org.apache.phoenix.query.QueryConstants.CDC_JSON_COL_NAME;
import static org.apache.phoenix.query.QueryConstants.CDC_POST_IMAGE;
import static org.apache.phoenix.query.QueryConstants.CDC_PRE_IMAGE;
import static org.apache.phoenix.query.QueryConstants.CDC_UPSERT_EVENT_TYPE;
@@ -315,15 +320,24 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
List<Set<ChangeRow>> batches = new ArrayList<>(nBatches);
Set<Map<String, Object>> mutatedRows = new HashSet<>(nRows);
long batchTS = startTS;
+ boolean gotDelete = false;
for (int i = 0; i < nBatches; ++i) {
Set<ChangeRow> batch = new TreeSet<>();
for (int j = 0; j < nRows; ++j) {
if (rand.nextInt(nRows) % 2 == 0) {
- boolean isDelete = mutatedRows.contains(rows.get(j))
- && rand.nextInt(5) == 0;
+ boolean isDelete;
+ if (i > nBatches/2 && ! gotDelete) {
+ // Force a delete if there was none so far.
+ isDelete = true;
+ }
+ else {
+ isDelete = mutatedRows.contains(rows.get(j))
+ && rand.nextInt(5) == 0;
+ }
ChangeRow changeRow;
if (isDelete) {
changeRow = new ChangeRow(null, batchTS, rows.get(j),
null);
+ gotDelete = true;
}
else {
changeRow = new ChangeRow(null, batchTS, rows.get(j),
@@ -337,6 +351,20 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
batchTS += 100;
}
+ // For debug: uncomment to see the mutations generated.
+ LOGGER.debug("----- DUMP Mutations -----");
+ int bnr = 1, mnr = 0;
+ for (Set<ChangeRow> batch: batches) {
+ for (ChangeRow change : batch) {
+ LOGGER.debug("Mutation: " + (++mnr) + " in batch: " + bnr + "
" +
+ " tenantId:" + change.tenantId +
+ " changeTS: " + change.changeTS +
+ " pks: " + change.pks +
+ " change: " + change.change);
+ }
+ ++bnr;
+ }
+ LOGGER.debug("----------");
return batches;
}
@@ -365,18 +393,68 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
return row;
}
- protected void applyMutations(CommitAdapter committer, String
datatableName, String tid,
- List<Set<ChangeRow>> batches) throws
Exception {
+ protected void applyMutations(CommitAdapter committer, String schemaName,
String tableName,
+ String datatableName, String tid,
List<Set<ChangeRow>> batches,
+ String cdcName)
+ throws Exception {
EnvironmentEdgeManager.injectEdge(injectEdge);
try (Connection conn = committer.getConnection(tid)) {
for (Set<ChangeRow> batch: batches) {
for (ChangeRow changeRow: batch) {
- addChange(conn, datatableName, changeRow);
+ addChange(conn, tableName, changeRow);
}
committer.commit(conn);
}
}
committer.reset();
+
+ // For debug: uncomment to see the exact HBase cells.
+ dumpCells(schemaName, tableName, datatableName, cdcName);
+ }
+
+ protected void dumpCells(String schemaName, String tableName, String
datatableName,
+ String cdcName) throws Exception {
+ LOGGER.debug("----- DUMP data table: " + datatableName + " -----");
+ SingleCellIndexIT.dumpTable(datatableName);
+ String indexName = CDCUtil.getCDCIndexName(cdcName);
+ String indexTableName = SchemaUtil.getTableName(schemaName, tableName
== datatableName ?
+ indexName : getViewIndexPhysicalName(datatableName));
+ LOGGER.debug("----- DUMP index table: " + indexTableName + " -----");
+ try {
+ SingleCellIndexIT.dumpTable(indexTableName);
+ } catch (TableNotFoundException e) {
+ // Ignore, this would happen if CDC is not yet created. This use
case is going to go
+ // away soon anyway.
+ }
+ LOGGER.debug("----------");
+ }
+
+ protected void dumpCDCResults(Connection conn, String cdcName, Map<String,
String> pkColumns,
+ String cdcQuery) throws Exception {
+ try (Statement stmt = conn.createStatement()) {
+ try (ResultSet rs = stmt.executeQuery(cdcQuery)) {
+ LOGGER.debug("----- DUMP CDC: " + cdcName + " -----");
+ for (int i = 0; rs.next(); ++i) {
+ LOGGER.debug("CDC row: " + (i+1) + " timestamp="
+ + rs.getDate(1).getTime() + " "
+ + collectColumns(pkColumns, rs) + ", " +
CDC_JSON_COL_NAME + "="
+ + rs.getString(pkColumns.size() + 2));
+ }
+ LOGGER.debug("----------");
+ }
+ }
+ }
+
+ private static String collectColumns(Map<String, String> pkColumns,
ResultSet rs) {
+ return pkColumns.keySet().stream().map(
+ k -> {
+ try {
+ return k + "=" + rs.getObject(k);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }).collect(
+ Collectors.joining(", "));
}
protected void createTable(Connection conn, String tableName, Map<String,
String> pkColumns,
@@ -523,9 +601,38 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
}
}
committer.reset();
+ // For debug logging, uncomment this code to see the list of changes.
+ for (int i = 0; i < changes.size(); ++i) {
+ LOGGER.debug("----- generated change: " + i +
+ " tenantId:" + changes.get(i).tenantId +
+ " changeTS: " + changes.get(i).changeTS +
+ " pks: " + changes.get(i).pks +
+ " change: " + changes.get(i).change);
+ }
return changes;
}
+ protected void verifyChangesViaSCN(String tenantId, Connection conn,
String cdcFullName,
+ Map<String, String> pkColumns,
+ String dataTableName, Map<String,
String> dataColumns,
+ List<ChangeRow> changes, long startTS,
long endTS)
+ throws Exception {
+ List<ChangeRow> filteredChanges = new ArrayList<>();
+ for (ChangeRow change: changes) {
+ if (change.changeTS >= startTS && change.changeTS <= endTS) {
+ filteredChanges.add(change);
+ }
+ }
+ String cdcSql = "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROm " +
cdcFullName + " WHERE " +
+ " PHOENIX_ROW_TIMESTAMP() >= CAST(CAST(" + startTS + " AS
BIGINT) AS TIMESTAMP) " +
+ "AND PHOENIX_ROW_TIMESTAMP() <= CAST(CAST(" + endTS + " AS
BIGINT) AS TIMESTAMP)";
+ dumpCDCResults(conn, cdcFullName,
+ new TreeMap<String, String>() {{ put("K1", "INTEGER"); }},
cdcSql);
+ try (ResultSet rs = conn.createStatement().executeQuery(cdcSql)) {
+ verifyChangesViaSCN(tenantId, rs, dataTableName, dataColumns,
filteredChanges,
+ CHANGE_IMG);
+ }
+ }
protected void verifyChangesViaSCN(String tenantId, ResultSet rs, String
dataTableName,
Map<String, String> dataColumns,
List<ChangeRow> changes,
@@ -533,12 +640,13 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
Set<Map<String, Object>> deletedRows = new HashSet<>();
for (int i = 0, changenr = 0; i < changes.size(); ++i) {
ChangeRow changeRow = changes.get(i);
- if (changeRow.getTenantID() != null && changeRow.getTenantID() !=
tenantId) {
+ if (tenantId != null && changeRow.getTenantID() != tenantId) {
continue;
}
if (changeRow.getChangeType() == CDC_DELETE_EVENT_TYPE) {
// Consecutive delete operations don't appear as separate
events.
if (deletedRows.contains(changeRow.pks)) {
+ ++changenr;
continue;
}
deletedRows.add(changeRow.pks);
@@ -546,13 +654,14 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
else {
deletedRows.remove(changeRow.pks);
}
- String changeDesc = "Change " + (changenr+1) + ": " + changeRow;
+ String changeDesc = "Change " + changenr + ": " + changeRow;
assertTrue(changeDesc, rs.next());
for (Map.Entry<String, Object> pkCol: changeRow.pks.entrySet()) {
assertEquals(changeDesc, pkCol.getValue(),
rs.getObject(pkCol.getKey()));
}
Map<String, Object> cdcObj =
mapper.reader(HashMap.class).readValue(
rs.getString(changeRow.pks.size()+2));
+ assertEquals(changeDesc, changeRow.getChangeType(),
cdcObj.get(CDC_EVENT_TYPE));
if (cdcObj.containsKey(CDC_PRE_IMAGE)
&& ! ((Map) cdcObj.get(CDC_PRE_IMAGE)).isEmpty()
&& changeScopes.contains(PTable.CDCChangeScope.PRE)) {
@@ -650,7 +759,9 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
}
protected List<ChangeRow> generateChangesImmutableTable(long startTS,
String[] tenantids,
- String tableName,
CommitAdapter committer)
+ String schemaName,
String tableName,
+ String
datatableName,
+ CommitAdapter
committer, String cdcName)
throws Exception {
List<ChangeRow> changes = new ArrayList<>();
EnvironmentEdgeManager.injectEdge(injectEdge);
@@ -717,6 +828,15 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
}
}
committer.reset();
+ // For debug logging, uncomment this code to see the list of changes.
+ dumpCells(schemaName, tableName, datatableName, cdcName);
+ for (int i = 0; i < changes.size(); ++i) {
+ LOGGER.debug("----- generated change: " + i +
+ " tenantId:" + changes.get(i).tenantId +
+ " changeTS: " + changes.get(i).changeTS +
+ " pks: " + changes.get(i).pks +
+ " change: " + changes.get(i).change);
+ }
return changes;
}
@@ -729,13 +849,13 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
)
protected class ChangeRow implements Comparable<ChangeRow> {
@JsonProperty
- private final String tenantId;
+ protected final String tenantId;
@JsonProperty
- private final long changeTS;
+ protected final long changeTS;
@JsonProperty
- private final Map<String, Object> pks;
+ protected final Map<String, Object> pks;
@JsonProperty
- private final Map<String, Object> change;
+ protected final Map<String, Object> change;
public String getTenantID() {
return tenantId;
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
index a1b99e90b2..317c2af895 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
@@ -41,9 +41,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
-import java.sql.PreparedStatement;
import java.sql.ResultSet;
-import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
@@ -53,9 +51,9 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
-import java.util.stream.Collectors;
import static
org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT;
import static
org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT;
@@ -104,14 +102,13 @@ public class CDCQueryIT extends CDCBaseIT {
}
@Parameterized.Parameters(name = "forView={0}, encodingScheme={1}, " +
- "multitenant={2}, indexSaltBuckets={3}, tableSaltBuckets={4}
withSchemaName=${5}")
+ "multitenant={2}, indexSaltBuckets={3}, tableSaltBuckets={4}
withSchemaName={5}")
public static synchronized Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{ Boolean.FALSE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null,
null, Boolean.FALSE },
{ Boolean.FALSE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null,
null, Boolean.TRUE },
- { Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.FALSE, 1, 1,
Boolean.FALSE },
- // Once PHOENIX-7239, change this to have different salt
buckets for data and index.
- { Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.TRUE, 1, 1,
Boolean.TRUE },
+ { Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.FALSE, null,
4, Boolean.FALSE },
+ { Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.TRUE, 1, 2,
Boolean.TRUE },
{ Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.FALSE, 4,
null, Boolean.FALSE },
{ Boolean.TRUE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null,
null, Boolean.FALSE },
});
@@ -172,11 +169,13 @@ public class CDCQueryIT extends CDCBaseIT {
List<ChangeRow> changes = generateChanges(startTS, tenantids,
tableName, null,
COMMIT_SUCCESS);
- //SingleCellIndexIT.dumpTable(tableName);
- //SingleCellIndexIT.dumpTable(CDCUtil.getCDCIndexName(cdcName));
-
String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
try (Connection conn = newConnection(tenantId)) {
+ // For debug: uncomment to see the exact results logged to console.
+ dumpCDCResults(conn, cdcName,
+ new TreeMap<String, String>() {{ put("K", "INTEGER"); }},
+ "SELECT /*+ CDC_INCLUDE(PRE, POST) */
PHOENIX_ROW_TIMESTAMP(), K," +
+ "\"CDC JSON\" FROM " + cdcFullName);
// Existence of CDC shouldn't cause the regular query path to fail.
String uncovered_sql = "SELECT " + " /*+ INDEX(" + tableName + " "
+
@@ -292,42 +291,15 @@ public class CDCQueryIT extends CDCBaseIT {
Map<String, List<Set<ChangeRow>>> allBatches = new
HashMap<>(tenantids.length);
for (String tid: tenantids) {
allBatches.put(tid, generateMutations(startTS, pkColumns,
dataColumns, 20, 5));
- // For debug: uncomment to see the exact mutations that are being
applied.
- //LOGGER.debug("----- DUMP Mutations -----");
- //int bnr = 1, mnr = 0;
- //for (Set<ChangeRow> batch: allBatches.get(tid)) {
- // for (ChangeRow changeRow : batch) {
- // LOGGER.debug("Mutation: " + (++mnr) + " in batch: " +
bnr + " " + changeRow);
- // }
- // ++bnr;
- //}
- //LOGGER.debug("----------");
- applyMutations(COMMIT_SUCCESS, tableName, tid,
allBatches.get(tid));
+ applyMutations(COMMIT_SUCCESS, schemaName, tableName,
datatableName, tid,
+ allBatches.get(tid), cdcName);
}
- // For debug: uncomment to see the exact HBase cells.
- //LOGGER.debug("----- DUMP data table: " + datatableName + " -----");
- //SingleCellIndexIT.dumpTable(datatableName);
- //LOGGER.debug("----- DUMP index table: " +
CDCUtil.getCDCIndexName(cdcName) + " -----");
- //SingleCellIndexIT.dumpTable(CDCUtil.getCDCIndexName(cdcName));
- //LOGGER.debug("----------");
-
String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
try (Connection conn = newConnection(tenantId)) {
// For debug: uncomment to see the exact results logged to console.
- //try (Statement stmt = conn.createStatement()) {
- // try (ResultSet rs = stmt.executeQuery(
- // "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " +
cdcFullName)) {
- // LOGGER.debug("----- DUMP CDC: " + cdcName + " -----");
- // for (int i = 0; rs.next(); ++i) {
- // LOGGER.debug("CDC row: " + (i+1) + " timestamp="
- // + rs.getDate(1).getTime() + " "
- // + collectColumns(pkColumns, rs) + ", " +
CDC_JSON_COL_NAME + "="
- // + rs.getString(pkColumns.size() + 2));
- // }
- // LOGGER.debug("----------");
- // }
- //}
+ dumpCDCResults(conn, cdcName, pkColumns,
+ "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " +
cdcFullName);
List<ChangeRow> changes = new ArrayList<>();
for (Set<ChangeRow> batch: allBatches.get(tenantId)) {
@@ -349,18 +321,6 @@ public class CDCQueryIT extends CDCBaseIT {
}
}
- private static String collectColumns(Map<String, String> pkColumns,
ResultSet rs) {
- return pkColumns.keySet().stream().map(
- k -> {
- try {
- return k + "=" + rs.getObject(k);
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
- }).collect(
- Collectors.joining(", "));
- }
-
private void _testSelectCDCImmutable(PTable.ImmutableStorageScheme
immutableStorageScheme)
throws Exception {
String cdcName, cdc_sql;
@@ -392,26 +352,21 @@ public class CDCQueryIT extends CDCBaseIT {
}
long startTS = System.currentTimeMillis();
- List<ChangeRow> changes = generateChangesImmutableTable(startTS,
tenantids, tableName,
- COMMIT_SUCCESS);
+ List<ChangeRow> changes = generateChangesImmutableTable(startTS,
tenantids, schemaName,
+ tableName, datatableName, COMMIT_SUCCESS, cdcName);
String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
Map<String, String> dataColumns = new TreeMap<String, String>() {{
put("V1", "INTEGER");
put("V2", "INTEGER");
}};
+
try (Connection conn = newConnection(tenantId)) {
// For debug: uncomment to see the exact results logged to console.
- //try (Statement stmt = conn.createStatement()) {
- // try (ResultSet rs = stmt.executeQuery(
- // "SELECT /*+ CDC_INCLUDE(PRE, POST) */
PHOENIX_ROW_TIMESTAMP(), K," +
- // "\"CDC JSON\" FROM " + cdcFullName)) {
- // while (rs.next()) {
- // System.out.println("----- " + rs.getString(1) + " " +
- // rs.getInt(2) + " " + rs.getString(3));
- // }
- // }
- //}
+ dumpCDCResults(conn, cdcName,
+ new TreeMap<String, String>() {{ put("K", "INTEGER"); }},
+ "SELECT /*+ CDC_INCLUDE(PRE, POST) */
PHOENIX_ROW_TIMESTAMP(), K," +
+ "\"CDC JSON\" FROM " + cdcFullName);
verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery(
"SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " +
cdcFullName),
datatableName, dataColumns, changes, PRE_POST_IMG);
@@ -436,15 +391,19 @@ public class CDCQueryIT extends CDCBaseIT {
}
@Test
- public void testSelectTimeRangeQueries() throws Exception {
+ public void testSelectWithTimeRange() throws Exception {
String cdcName, cdc_sql;
String schemaName = withSchemaName ? generateUniqueName() : null;
String tableName = SchemaUtil.getTableName(schemaName,
generateUniqueName());
+ String datatableName = tableName;
+ Map<String, String> pkColumns = new TreeMap<String, String>() {{
+ put("K1", "INTEGER");
+ }};
+ Map<String, String> dataColumns = new TreeMap<String, String>() {{
+ put("V1", "INTEGER");
+ }};
try (Connection conn = newConnection()) {
- createTable(conn, "CREATE TABLE " + tableName + " (" +
- (multitenant ? "TENANT_ID CHAR(5) NOT NULL, " : "") +
- "k INTEGER NOT NULL, v1 INTEGER, CONSTRAINT PK PRIMARY KEY
" +
- (multitenant ? "(TENANT_ID, k) " : "(k)") + ")",
encodingScheme, multitenant,
+ createTable(conn, tableName, pkColumns, dataColumns, multitenant,
encodingScheme,
tableSaltBuckets, false, null);
if (forView) {
String viewName = SchemaUtil.getTableName(schemaName,
generateUniqueName());
@@ -453,125 +412,58 @@ public class CDCQueryIT extends CDCBaseIT {
tableName = viewName;
}
cdcName = generateUniqueName();
- cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+ cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + " INCLUDE
(change)";
createCDC(conn, cdc_sql, encodingScheme, indexSaltBuckets);
cdcIndexShouldNotBeUsedForDataTableQueries(conn,
tableName,cdcName);
}
- EnvironmentEdgeManager.injectEdge(injectEdge);
-
String tenantId = multitenant ? "1000" : null;
String[] tenantids = {tenantId};
if (multitenant) {
tenantids = new String[] {tenantId, "2000"};
}
- Timestamp ts1 = new Timestamp(System.currentTimeMillis());
- cal.setTimeInMillis(ts1.getTime());
- injectEdge.setValue(ts1.getTime());
-
- for (String tid: tenantids) {
- try (Connection conn = newConnection(tid)) {
- conn.createStatement().execute("UPSERT INTO " + tableName + "
(k, v1) VALUES (1, 100)");
- conn.commit();
- }
- }
-
- injectEdge.incrementValue(100);
-
- for (String tid: tenantids) {
- try (Connection conn = newConnection(tid)) {
- conn.createStatement().execute("UPSERT INTO " + tableName + "
(k, v1) VALUES (2, 200)");
- conn.commit();
- }
- }
-
- injectEdge.incrementValue(100);
- cal.add(Calendar.MILLISECOND, 200);
- Timestamp ts2 = new Timestamp(cal.getTime().getTime());
- injectEdge.incrementValue(100);
-
- for (String tid: tenantids) {
- try (Connection conn = newConnection(tid)) {
- conn.createStatement().execute("UPSERT INTO " + tableName + "
(k, v1) VALUES (1, 101)");
- conn.commit();
- injectEdge.incrementValue(100);
- conn.createStatement().execute("UPSERT INTO " + tableName + "
(k, v1) VALUES (3, 300)");
- conn.commit();
- }
- }
-
- injectEdge.incrementValue(100);
- cal.add(Calendar.MILLISECOND, 200 + 100 * tenantids.length);
- Timestamp ts3 = new Timestamp(cal.getTime().getTime());
- injectEdge.incrementValue(100);
-
+ long startTS = System.currentTimeMillis();
+ Map<String, List<Set<ChangeRow>>> allBatches = new
HashMap<>(tenantids.length);
for (String tid: tenantids) {
- try (Connection conn = newConnection(tid)) {
- conn.createStatement().execute("UPSERT INTO " + tableName + "
(k, v1) VALUES (1, 101)");
- conn.commit();
- injectEdge.incrementValue(100);
- conn.createStatement().execute("DELETE FROM " + tableName + "
WHERE k = 2");
- conn.commit();
- }
+ allBatches.put(tid, generateMutations(startTS, pkColumns,
dataColumns, 20, 5));
+ applyMutations(COMMIT_SUCCESS, schemaName, tableName,
datatableName, tid,
+ allBatches.get(tid), cdcName);
}
- injectEdge.incrementValue(100);
- cal.add(Calendar.MILLISECOND, 200 + 100 * tenantids.length);
- Timestamp ts4 = new Timestamp(cal.getTime().getTime());
- EnvironmentEdgeManager.reset();
-
- //SingleCellIndexIT.dumpTable(CDCUtil.getCDCIndexName(cdcName));
-
String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
try (Connection conn = newConnection(tenantId)) {
- String sel_sql =
- "SELECT to_char(phoenix_row_timestamp()), k, \"CDC JSON\"
FROM " + cdcFullName +
- " WHERE PHOENIX_ROW_TIMESTAMP() >= ? AND
PHOENIX_ROW_TIMESTAMP() <= ?";
- Object[] testDataSets = new Object[] {
- new Object[] {ts1, ts2, new int[] {1, 2}},
- new Object[] {ts2, ts3, new int[] {1, 3}},
- new Object[] {ts3, ts4, new int[] {1, 2}},
- new Object[] {ts1, ts4, new int[] {1, 2, 1, 3, 1, 2}},
- };
- PreparedStatement stmt = conn.prepareStatement(sel_sql);
// For debug: uncomment to see the exact results logged to console.
- //System.out.println("----- ts1: " + ts1 + " ts2: " + ts2 + " ts3:
" + ts3 + " ts4: " +
- // ts4);
- //for (int i = 0; i < testDataSets.length; ++i) {
- // Object[] testData = (Object[]) testDataSets[i];
- // stmt.setTimestamp(1, (Timestamp) testData[0]);
- // stmt.setTimestamp(2, (Timestamp) testData[1]);
- // try (ResultSet rs = stmt.executeQuery()) {
- // System.out.println("----- Test data set: " + i);
- // while (rs.next()) {
- // System.out.println("----- " + rs.getString(1) + " " +
- // rs.getInt(2) + " " + rs.getString(3));
- // }
- // }
- //}
- for (int i = 0; i < testDataSets.length; ++i) {
- Object[] testData = (Object[]) testDataSets[i];
- stmt.setTimestamp(1, (Timestamp) testData[0]);
- stmt.setTimestamp(2, (Timestamp) testData[1]);
- try (ResultSet rs = stmt.executeQuery()) {
- for (int j = 0; j < ((int[]) testData[2]).length; ++j) {
- int k = ((int[]) testData[2])[j];
- assertEquals(" Index: " + j + " Test data set: " + i,
- true, rs.next());
- assertEquals(" Index: " + j + " Test data set: " + i,
- k, rs.getInt(2));
- }
- assertEquals("Test data set: " + i, false, rs.next());
- }
- }
+ dumpCDCResults(conn, cdcName, pkColumns,
+ "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " +
cdcFullName);
- PreparedStatement pstmt = conn.prepareStatement(
- "SELECT * FROM " + cdcFullName + " WHERE
PHOENIX_ROW_TIMESTAMP() > ?");
- pstmt.setTimestamp(1, ts4);
- try (ResultSet rs = pstmt.executeQuery()) {
- assertEquals(false, rs.next());
+ List<ChangeRow> changes = new ArrayList<>();
+ for (Set<ChangeRow> batch: allBatches.get(tenantId)) {
+ changes.addAll(batch);
+ }
+ List<Long> uniqueTimestamps = new ArrayList<>();
+ Integer lastDeletionTSpos = null;
+ for (ChangeRow change: changes) {
+ if (uniqueTimestamps.size() == 0 ||
+ uniqueTimestamps.get(uniqueTimestamps.size()-1) !=
change.changeTS) {
+ uniqueTimestamps.add(change.changeTS);
+ }
+ if (change.change == null) {
+ lastDeletionTSpos = uniqueTimestamps.size() - 1;
+ }
}
+ Random rand = new Random();
+ int randMinTSpos = rand.nextInt(lastDeletionTSpos - 1);
+ int randMaxTSpos = randMinTSpos + 1 + rand.nextInt(
+ uniqueTimestamps.size() - (randMinTSpos + 1));
+ verifyChangesViaSCN(tenantId, conn, cdcFullName, pkColumns,
+ datatableName, dataColumns, changes, 0,
System.currentTimeMillis());
+ verifyChangesViaSCN(tenantId, conn, cdcFullName, pkColumns,
+ datatableName, dataColumns, changes, randMinTSpos,
randMaxTSpos);
+ verifyChangesViaSCN(tenantId, conn, cdcFullName, pkColumns,
+ datatableName, dataColumns, changes, randMinTSpos,
lastDeletionTSpos);
+ verifyChangesViaSCN(tenantId, conn, cdcFullName, pkColumns,
+ datatableName, dataColumns, changes, lastDeletionTSpos,
randMaxTSpos);
}
}