This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new f3eb74473a PHOENIX-7409: Fix the bug that is causing incorrect salt 
bucket number to be used for mutations generated in CDC index. (#1989)
f3eb74473a is described below

commit f3eb74473afd44a54d8aa102c874eeb460199de3
Author: Hari Krishna Dara <[email protected]>
AuthorDate: Fri Oct 4 19:28:13 2024 +0530

    PHOENIX-7409: Fix the bug that is causing incorrect salt bucket number to 
be used for mutations generated in CDC index. (#1989)
    
    - Revamped the time range test and included the coverage for special CDC DF 
markers. Also refactored some debug code.
    - Fix issues with debug logging code
    - Uncomment debug code to help diagnose the occasional flappers we are 
seeing
---
 .../org/apache/phoenix/execute/MutationState.java  |   2 +-
 .../org/apache/phoenix/index/CDCTableInfo.java     |   1 -
 .../coprocessor/GlobalIndexRegionScanner.java      |   4 -
 .../phoenix/hbase/index/IndexRegionObserver.java   |   3 +-
 .../java/org/apache/phoenix/end2end/CDCBaseIT.java | 144 +++++++++++--
 .../org/apache/phoenix/end2end/CDCQueryIT.java     | 234 ++++++---------------
 6 files changed, 197 insertions(+), 191 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
index e5a0c3ede2..419b273c72 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -681,7 +681,7 @@ public class MutationState implements SQLCloseable {
                 };
                 ImmutableBytesPtr key = new 
ImmutableBytesPtr(maintainer.buildRowKey(
                         getter, ptr, null, null, mutationTimestamp));
-                PRow row = table.newRow(
+                PRow row = index.newRow(
                         connection.getKeyValueBuilder(), mutationTimestamp, 
key, false);
                 row.delete();
                 indexMutations.addAll(row.toRowMutations());
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/index/CDCTableInfo.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/index/CDCTableInfo.java
index 4d80f3e9d2..02fe008ab3 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/index/CDCTableInfo.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/index/CDCTableInfo.java
@@ -164,7 +164,6 @@ public class CDCTableInfo {
         if (cdcDataTableRef.getTable().isImmutableRows() &&
                 cdcDataTableRef.getTable().getImmutableStorageScheme() ==
                         
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
-
             List<ColumnRef> dataColumns = new ArrayList<ColumnRef>();
             PTable table = cdcDataTableRef.getTable();
             for (PColumn column : table.getColumns()) {
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
index ad853ef141..f5ef7a87dc 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
@@ -1380,10 +1380,6 @@ public abstract class GlobalIndexRegionScanner extends 
BaseRegionScanner {
                             // CDC Index needs two delete markers one for 
deleting the index row,
                             // and the other for referencing the data table 
delete mutation with
                             // the right index row key, that is, the index row 
key starting with ts
-                            Put cdcDataRowState = new 
Put(currentDataRowState.getRow());
-                            
cdcDataRowState.addColumn(indexMaintainer.getDataEmptyKeyValueCF(),
-                                    
indexMaintainer.getEmptyKeyValueQualifierForDataTable(), ts,
-                                    ByteUtil.EMPTY_BYTE_ARRAY);
                             
indexMutations.add(IndexRegionObserver.getDeleteIndexMutation(
                                     currentDataRowState, indexMaintainer, ts, 
rowKeyPtr));
                         }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 8be7419780..cf612f4600 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -1038,7 +1038,6 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
                         context.indexUpdates.put(hTableInterfaceReference,
                                 new Pair<Mutation, 
byte[]>(getDeleteIndexMutation(cdcDataRowState,
                                         indexMaintainer, ts, rowKeyPtr), 
rowKeyPtr.get()));
-
                     }
                 }
             }
@@ -2004,4 +2003,4 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
     public static boolean isAtomicOperationComplete(OperationStatus status) {
         return status.getOperationStatusCode() == SUCCESS && 
status.getResult() != null;
     }
-}
\ No newline at end of file
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
index 7e6e82ef62..508f6fced0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
@@ -27,7 +27,9 @@ import com.fasterxml.jackson.core.Version;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.index.SingleCellIndexIT;
 import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PTable;
@@ -65,11 +67,14 @@ import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static java.util.stream.Collectors.*;
 import static org.apache.phoenix.query.QueryConstants.CDC_CHANGE_IMAGE;
 import static org.apache.phoenix.query.QueryConstants.CDC_DELETE_EVENT_TYPE;
+import static org.apache.phoenix.query.QueryConstants.CDC_EVENT_TYPE;
+import static org.apache.phoenix.query.QueryConstants.CDC_JSON_COL_NAME;
 import static org.apache.phoenix.query.QueryConstants.CDC_POST_IMAGE;
 import static org.apache.phoenix.query.QueryConstants.CDC_PRE_IMAGE;
 import static org.apache.phoenix.query.QueryConstants.CDC_UPSERT_EVENT_TYPE;
@@ -315,15 +320,24 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
         List<Set<ChangeRow>> batches = new ArrayList<>(nBatches);
         Set<Map<String, Object>> mutatedRows = new HashSet<>(nRows);
         long batchTS = startTS;
+        boolean gotDelete = false;
         for (int i = 0; i < nBatches; ++i) {
             Set<ChangeRow> batch = new TreeSet<>();
             for (int j = 0; j < nRows; ++j) {
                 if (rand.nextInt(nRows) % 2 == 0) {
-                    boolean isDelete = mutatedRows.contains(rows.get(j))
-                            && rand.nextInt(5) == 0;
+                    boolean isDelete;
+                    if (i > nBatches/2 && ! gotDelete) {
+                        // Force a delete if there was none so far.
+                        isDelete = true;
+                    }
+                    else {
+                        isDelete = mutatedRows.contains(rows.get(j))
+                                && rand.nextInt(5) == 0;
+                    }
                     ChangeRow changeRow;
                     if (isDelete) {
                         changeRow = new ChangeRow(null, batchTS, rows.get(j), 
null);
+                        gotDelete = true;
                     }
                     else {
                         changeRow = new ChangeRow(null, batchTS, rows.get(j),
@@ -337,6 +351,20 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
             batchTS += 100;
         }
 
+        // For debug: uncomment to see the mutations generated.
+        LOGGER.debug("----- DUMP Mutations -----");
+        int bnr = 1, mnr = 0;
+        for (Set<ChangeRow> batch: batches) {
+            for (ChangeRow change : batch) {
+                LOGGER.debug("Mutation: " + (++mnr) + " in batch: " + bnr + " 
" +
+                    " tenantId:" + change.tenantId +
+                    " changeTS: " + change.changeTS +
+                    " pks: " + change.pks +
+                    " change: " + change.change);
+            }
+            ++bnr;
+        }
+        LOGGER.debug("----------");
         return batches;
     }
 
@@ -365,18 +393,68 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
         return row;
     }
 
-    protected void applyMutations(CommitAdapter committer, String 
datatableName, String tid,
-                                  List<Set<ChangeRow>> batches) throws 
Exception {
+    protected void applyMutations(CommitAdapter committer, String schemaName, 
String tableName,
+                                  String datatableName, String tid, 
List<Set<ChangeRow>> batches,
+                                  String cdcName)
+            throws Exception {
         EnvironmentEdgeManager.injectEdge(injectEdge);
         try (Connection conn = committer.getConnection(tid)) {
             for (Set<ChangeRow> batch: batches) {
                 for (ChangeRow changeRow: batch) {
-                    addChange(conn, datatableName, changeRow);
+                    addChange(conn, tableName, changeRow);
                 }
                 committer.commit(conn);
             }
         }
         committer.reset();
+
+        // For debug: uncomment to see the exact HBase cells.
+        dumpCells(schemaName, tableName, datatableName, cdcName);
+    }
+
+    protected void dumpCells(String schemaName, String tableName, String 
datatableName,
+                             String cdcName) throws Exception {
+        LOGGER.debug("----- DUMP data table: " + datatableName + " -----");
+        SingleCellIndexIT.dumpTable(datatableName);
+        String indexName = CDCUtil.getCDCIndexName(cdcName);
+        String indexTableName = SchemaUtil.getTableName(schemaName, tableName 
== datatableName ?
+                indexName : getViewIndexPhysicalName(datatableName));
+        LOGGER.debug("----- DUMP index table: " + indexTableName + " -----");
+        try {
+            SingleCellIndexIT.dumpTable(indexTableName);
+        } catch (TableNotFoundException e) {
+            // Ignore, this would happen if CDC is not yet created. This use 
case is going to go
+            // away soon anyway.
+        }
+        LOGGER.debug("----------");
+    }
+
+    protected void dumpCDCResults(Connection conn, String cdcName, Map<String, 
String> pkColumns,
+                                  String cdcQuery) throws Exception {
+        try (Statement stmt = conn.createStatement()) {
+            try (ResultSet rs = stmt.executeQuery(cdcQuery)) {
+                LOGGER.debug("----- DUMP CDC: " + cdcName + " -----");
+                for (int i = 0; rs.next(); ++i) {
+                    LOGGER.debug("CDC row: " + (i+1) + " timestamp="
+                            + rs.getDate(1).getTime() + " "
+                            + collectColumns(pkColumns, rs) + ", " + 
CDC_JSON_COL_NAME + "="
+                            + rs.getString(pkColumns.size() + 2));
+                }
+                LOGGER.debug("----------");
+            }
+        }
+    }
+
+    private static String collectColumns(Map<String, String> pkColumns, 
ResultSet rs) {
+        return pkColumns.keySet().stream().map(
+                k -> {
+                    try {
+                        return k + "=" + rs.getObject(k);
+                    } catch (SQLException e) {
+                        throw new RuntimeException(e);
+                    }
+                }).collect(
+                Collectors.joining(", "));
     }
 
     protected void createTable(Connection conn, String tableName, Map<String, 
String> pkColumns,
@@ -523,9 +601,38 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
             }
         }
         committer.reset();
+        // For debug logging, uncomment this code to see the list of changes.
+        for (int i = 0; i < changes.size(); ++i) {
+            LOGGER.debug("----- generated change: " + i +
+                    " tenantId:" + changes.get(i).tenantId +
+                    " changeTS: " + changes.get(i).changeTS +
+                    " pks: " + changes.get(i).pks +
+                    " change: " + changes.get(i).change);
+        }
         return changes;
     }
 
+    protected void verifyChangesViaSCN(String tenantId, Connection conn, 
String cdcFullName,
+                                       Map<String, String> pkColumns,
+                                       String dataTableName, Map<String, 
String> dataColumns,
+                                       List<ChangeRow> changes, long startTS, 
long endTS)
+            throws Exception {
+        List<ChangeRow> filteredChanges = new ArrayList<>();
+        for (ChangeRow change: changes) {
+            if (change.changeTS >= startTS && change.changeTS <= endTS) {
+                filteredChanges.add(change);
+            }
+        }
+        String cdcSql = "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROm " + 
cdcFullName + " WHERE " +
+                " PHOENIX_ROW_TIMESTAMP() >= CAST(CAST(" + startTS + " AS 
BIGINT) AS TIMESTAMP) " +
+                "AND PHOENIX_ROW_TIMESTAMP() <= CAST(CAST(" + endTS + " AS 
BIGINT) AS TIMESTAMP)";
+        dumpCDCResults(conn, cdcFullName,
+                new TreeMap<String, String>() {{ put("K1", "INTEGER"); }}, 
cdcSql);
+        try (ResultSet rs = conn.createStatement().executeQuery(cdcSql)) {
+            verifyChangesViaSCN(tenantId, rs, dataTableName, dataColumns, 
filteredChanges,
+                    CHANGE_IMG);
+        }
+    }
 
     protected void verifyChangesViaSCN(String tenantId, ResultSet rs, String 
dataTableName,
                                        Map<String, String> dataColumns, 
List<ChangeRow> changes,
@@ -533,12 +640,13 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
         Set<Map<String, Object>> deletedRows = new HashSet<>();
         for (int i = 0, changenr = 0; i < changes.size(); ++i) {
             ChangeRow changeRow = changes.get(i);
-            if (changeRow.getTenantID() != null && changeRow.getTenantID() != 
tenantId) {
+            if (tenantId != null && changeRow.getTenantID() != tenantId) {
                 continue;
             }
             if (changeRow.getChangeType() == CDC_DELETE_EVENT_TYPE) {
                 // Consecutive delete operations don't appear as separate 
events.
                 if (deletedRows.contains(changeRow.pks)) {
+                    ++changenr;
                     continue;
                 }
                 deletedRows.add(changeRow.pks);
@@ -546,13 +654,14 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
             else {
                 deletedRows.remove(changeRow.pks);
             }
-            String changeDesc = "Change " + (changenr+1) + ": " + changeRow;
+            String changeDesc = "Change " + changenr + ": " + changeRow;
             assertTrue(changeDesc, rs.next());
             for (Map.Entry<String, Object> pkCol: changeRow.pks.entrySet()) {
                 assertEquals(changeDesc, pkCol.getValue(), 
rs.getObject(pkCol.getKey()));
             }
             Map<String, Object> cdcObj = 
mapper.reader(HashMap.class).readValue(
                     rs.getString(changeRow.pks.size()+2));
+            assertEquals(changeDesc, changeRow.getChangeType(), 
cdcObj.get(CDC_EVENT_TYPE));
             if (cdcObj.containsKey(CDC_PRE_IMAGE)
                     && ! ((Map) cdcObj.get(CDC_PRE_IMAGE)).isEmpty()
                     && changeScopes.contains(PTable.CDCChangeScope.PRE)) {
@@ -650,7 +759,9 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
     }
 
     protected List<ChangeRow> generateChangesImmutableTable(long startTS, 
String[] tenantids,
-                                                            String tableName, 
CommitAdapter committer)
+                                                            String schemaName, 
String tableName,
+                                                            String 
datatableName,
+                                                            CommitAdapter 
committer, String cdcName)
             throws Exception {
         List<ChangeRow> changes = new ArrayList<>();
         EnvironmentEdgeManager.injectEdge(injectEdge);
@@ -717,6 +828,15 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
             }
         }
         committer.reset();
+        // For debug logging, uncomment this code to see the list of changes.
+        dumpCells(schemaName, tableName, datatableName, cdcName);
+        for (int i = 0; i < changes.size(); ++i) {
+            LOGGER.debug("----- generated change: " + i +
+                    " tenantId:" + changes.get(i).tenantId +
+                    " changeTS: " + changes.get(i).changeTS +
+                    " pks: " + changes.get(i).pks +
+                    " change: " + changes.get(i).change);
+        }
         return changes;
     }
 
@@ -729,13 +849,13 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
     )
     protected class ChangeRow implements Comparable<ChangeRow> {
         @JsonProperty
-        private final String tenantId;
+        protected final String tenantId;
         @JsonProperty
-        private final long changeTS;
+        protected final long changeTS;
         @JsonProperty
-        private final Map<String, Object> pks;
+        protected final Map<String, Object> pks;
         @JsonProperty
-        private final Map<String, Object> change;
+        protected final Map<String, Object> change;
 
         public String getTenantID() {
             return tenantId;
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
index a1b99e90b2..317c2af895 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
@@ -41,9 +41,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.sql.Connection;
-import java.sql.PreparedStatement;
 import java.sql.ResultSet;
-import java.sql.SQLException;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -53,9 +51,9 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.stream.Collectors;
 
 import static 
org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT;
 import static 
org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT;
@@ -104,14 +102,13 @@ public class CDCQueryIT extends CDCBaseIT {
     }
 
     @Parameterized.Parameters(name = "forView={0}, encodingScheme={1}, " +
-            "multitenant={2}, indexSaltBuckets={3}, tableSaltBuckets={4} 
withSchemaName=${5}")
+            "multitenant={2}, indexSaltBuckets={3}, tableSaltBuckets={4} 
withSchemaName={5}")
     public static synchronized Collection<Object[]> data() {
         return Arrays.asList(new Object[][] {
                 { Boolean.FALSE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null, 
null, Boolean.FALSE },
                 { Boolean.FALSE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null, 
null, Boolean.TRUE },
-                { Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.FALSE, 1, 1, 
Boolean.FALSE },
-                // Once PHOENIX-7239, change this to have different salt 
buckets for data and index.
-                { Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.TRUE, 1, 1, 
Boolean.TRUE },
+                { Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.FALSE, null, 
4, Boolean.FALSE },
+                { Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.TRUE, 1, 2, 
Boolean.TRUE },
                 { Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.FALSE, 4, 
null, Boolean.FALSE },
                 { Boolean.TRUE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null, 
null, Boolean.FALSE },
         });
@@ -172,11 +169,13 @@ public class CDCQueryIT extends CDCBaseIT {
         List<ChangeRow> changes = generateChanges(startTS, tenantids, 
tableName, null,
                 COMMIT_SUCCESS);
 
-        //SingleCellIndexIT.dumpTable(tableName);
-        //SingleCellIndexIT.dumpTable(CDCUtil.getCDCIndexName(cdcName));
-
         String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
         try (Connection conn = newConnection(tenantId)) {
+            // For debug: uncomment to see the exact results logged to console.
+            dumpCDCResults(conn, cdcName,
+                    new TreeMap<String, String>() {{ put("K", "INTEGER"); }},
+                    "SELECT /*+ CDC_INCLUDE(PRE, POST) */ 
PHOENIX_ROW_TIMESTAMP(), K," +
+                            "\"CDC JSON\" FROM " + cdcFullName);
 
             // Existence of CDC shouldn't cause the regular query path to fail.
             String uncovered_sql = "SELECT " + " /*+ INDEX(" + tableName + " " 
+
@@ -292,42 +291,15 @@ public class CDCQueryIT extends CDCBaseIT {
         Map<String, List<Set<ChangeRow>>> allBatches = new 
HashMap<>(tenantids.length);
         for (String tid: tenantids) {
             allBatches.put(tid, generateMutations(startTS, pkColumns, 
dataColumns, 20, 5));
-            // For debug: uncomment to see the exact mutations that are being 
applied.
-            //LOGGER.debug("----- DUMP Mutations -----");
-            //int bnr = 1, mnr = 0;
-            //for (Set<ChangeRow> batch: allBatches.get(tid)) {
-            //    for (ChangeRow changeRow : batch) {
-            //        LOGGER.debug("Mutation: " + (++mnr) + " in batch: " + 
bnr + " " + changeRow);
-            //    }
-            //    ++bnr;
-            //}
-            //LOGGER.debug("----------");
-            applyMutations(COMMIT_SUCCESS, tableName, tid, 
allBatches.get(tid));
+            applyMutations(COMMIT_SUCCESS, schemaName, tableName, 
datatableName, tid,
+                    allBatches.get(tid), cdcName);
         }
 
-        // For debug: uncomment to see the exact HBase cells.
-        //LOGGER.debug("----- DUMP data table: " + datatableName + " -----");
-        //SingleCellIndexIT.dumpTable(datatableName);
-        //LOGGER.debug("----- DUMP index table: " + 
CDCUtil.getCDCIndexName(cdcName) + " -----");
-        //SingleCellIndexIT.dumpTable(CDCUtil.getCDCIndexName(cdcName));
-        //LOGGER.debug("----------");
-
         String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
         try (Connection conn = newConnection(tenantId)) {
             // For debug: uncomment to see the exact results logged to console.
-            //try (Statement stmt = conn.createStatement()) {
-            //    try (ResultSet rs = stmt.executeQuery(
-            //            "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " + 
cdcFullName)) {
-            //        LOGGER.debug("----- DUMP CDC: " + cdcName + " -----");
-            //        for (int i = 0; rs.next(); ++i) {
-            //            LOGGER.debug("CDC row: " + (i+1) + " timestamp="
-            //                    + rs.getDate(1).getTime() + " "
-            //                    + collectColumns(pkColumns, rs) + ", " + 
CDC_JSON_COL_NAME + "="
-            //                    + rs.getString(pkColumns.size() + 2));
-            //        }
-            //        LOGGER.debug("----------");
-            //    }
-            //}
+            dumpCDCResults(conn, cdcName, pkColumns,
+                    "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " + 
cdcFullName);
 
             List<ChangeRow> changes = new ArrayList<>();
             for (Set<ChangeRow> batch: allBatches.get(tenantId)) {
@@ -349,18 +321,6 @@ public class CDCQueryIT extends CDCBaseIT {
         }
     }
 
-    private static String collectColumns(Map<String, String> pkColumns, 
ResultSet rs) {
-        return pkColumns.keySet().stream().map(
-                k -> {
-                    try {
-                        return k + "=" + rs.getObject(k);
-                    } catch (SQLException e) {
-                        throw new RuntimeException(e);
-                    }
-                }).collect(
-                Collectors.joining(", "));
-    }
-
     private void _testSelectCDCImmutable(PTable.ImmutableStorageScheme 
immutableStorageScheme)
             throws Exception {
         String cdcName, cdc_sql;
@@ -392,26 +352,21 @@ public class CDCQueryIT extends CDCBaseIT {
         }
 
         long startTS = System.currentTimeMillis();
-        List<ChangeRow> changes = generateChangesImmutableTable(startTS, 
tenantids, tableName,
-                COMMIT_SUCCESS);
+        List<ChangeRow> changes = generateChangesImmutableTable(startTS, 
tenantids, schemaName,
+                tableName, datatableName, COMMIT_SUCCESS, cdcName);
 
         String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
         Map<String, String> dataColumns = new TreeMap<String, String>() {{
             put("V1", "INTEGER");
             put("V2", "INTEGER");
         }};
+
         try (Connection conn = newConnection(tenantId)) {
             // For debug: uncomment to see the exact results logged to console.
-            //try (Statement stmt = conn.createStatement()) {
-            //    try (ResultSet rs = stmt.executeQuery(
-            //            "SELECT /*+ CDC_INCLUDE(PRE, POST) */ 
PHOENIX_ROW_TIMESTAMP(), K," +
-            //                    "\"CDC JSON\" FROM " + cdcFullName)) {
-            //        while (rs.next()) {
-            //            System.out.println("----- " + rs.getString(1) + " " +
-            //                    rs.getInt(2) + " " + rs.getString(3));
-            //        }
-            //    }
-            //}
+            dumpCDCResults(conn, cdcName,
+                    new TreeMap<String, String>() {{ put("K", "INTEGER"); }},
+                    "SELECT /*+ CDC_INCLUDE(PRE, POST) */ 
PHOENIX_ROW_TIMESTAMP(), K," +
+                            "\"CDC JSON\" FROM " + cdcFullName);
             verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery(
                             "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + 
cdcFullName),
                     datatableName, dataColumns, changes, PRE_POST_IMG);
@@ -436,15 +391,19 @@ public class CDCQueryIT extends CDCBaseIT {
     }
 
     @Test
-    public void testSelectTimeRangeQueries() throws Exception {
+    public void testSelectWithTimeRange() throws Exception {
         String cdcName, cdc_sql;
         String schemaName = withSchemaName ? generateUniqueName() : null;
         String tableName = SchemaUtil.getTableName(schemaName, 
generateUniqueName());
+        String datatableName = tableName;
+        Map<String, String> pkColumns = new TreeMap<String, String>() {{
+            put("K1", "INTEGER");
+        }};
+        Map<String, String> dataColumns = new TreeMap<String, String>() {{
+            put("V1", "INTEGER");
+        }};
         try (Connection conn = newConnection()) {
-            createTable(conn, "CREATE TABLE  " + tableName + " (" +
-                    (multitenant ? "TENANT_ID CHAR(5) NOT NULL, " : "") +
-                    "k INTEGER NOT NULL, v1 INTEGER, CONSTRAINT PK PRIMARY KEY 
" +
-                    (multitenant ? "(TENANT_ID, k) " : "(k)") + ")", 
encodingScheme, multitenant,
+            createTable(conn, tableName, pkColumns, dataColumns, multitenant, 
encodingScheme,
                     tableSaltBuckets, false, null);
             if (forView) {
                 String viewName = SchemaUtil.getTableName(schemaName, 
generateUniqueName());
@@ -453,125 +412,58 @@ public class CDCQueryIT extends CDCBaseIT {
                 tableName = viewName;
             }
             cdcName = generateUniqueName();
-            cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+            cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + " INCLUDE 
(change)";
             createCDC(conn, cdc_sql, encodingScheme, indexSaltBuckets);
             cdcIndexShouldNotBeUsedForDataTableQueries(conn, 
tableName,cdcName);
         }
 
-        EnvironmentEdgeManager.injectEdge(injectEdge);
-
         String tenantId = multitenant ? "1000" : null;
         String[] tenantids = {tenantId};
         if (multitenant) {
             tenantids = new String[] {tenantId, "2000"};
         }
 
-        Timestamp ts1 = new Timestamp(System.currentTimeMillis());
-        cal.setTimeInMillis(ts1.getTime());
-        injectEdge.setValue(ts1.getTime());
-
-        for (String tid: tenantids) {
-            try (Connection conn = newConnection(tid)) {
-                conn.createStatement().execute("UPSERT INTO " + tableName + " 
(k, v1) VALUES (1, 100)");
-                conn.commit();
-            }
-        }
-
-        injectEdge.incrementValue(100);
-
-        for (String tid: tenantids) {
-            try (Connection conn = newConnection(tid)) {
-                conn.createStatement().execute("UPSERT INTO " + tableName + " 
(k, v1) VALUES (2, 200)");
-                conn.commit();
-            }
-        }
-
-        injectEdge.incrementValue(100);
-        cal.add(Calendar.MILLISECOND, 200);
-        Timestamp ts2 = new Timestamp(cal.getTime().getTime());
-        injectEdge.incrementValue(100);
-
-        for (String tid: tenantids) {
-            try (Connection conn = newConnection(tid)) {
-                conn.createStatement().execute("UPSERT INTO " + tableName + " 
(k, v1) VALUES (1, 101)");
-                conn.commit();
-                injectEdge.incrementValue(100);
-                conn.createStatement().execute("UPSERT INTO " + tableName + " 
(k, v1) VALUES (3, 300)");
-                conn.commit();
-            }
-        }
-
-        injectEdge.incrementValue(100);
-        cal.add(Calendar.MILLISECOND, 200 + 100 * tenantids.length);
-        Timestamp ts3 = new Timestamp(cal.getTime().getTime());
-        injectEdge.incrementValue(100);
-
+        long startTS = System.currentTimeMillis();
+        Map<String, List<Set<ChangeRow>>> allBatches = new 
HashMap<>(tenantids.length);
         for (String tid: tenantids) {
-            try (Connection conn = newConnection(tid)) {
-                conn.createStatement().execute("UPSERT INTO " + tableName + " 
(k, v1) VALUES (1, 101)");
-                conn.commit();
-                injectEdge.incrementValue(100);
-                conn.createStatement().execute("DELETE FROM " + tableName + " 
WHERE k = 2");
-                conn.commit();
-            }
+            allBatches.put(tid, generateMutations(startTS, pkColumns, 
dataColumns, 20, 5));
+            applyMutations(COMMIT_SUCCESS, schemaName, tableName, 
datatableName, tid,
+                    allBatches.get(tid), cdcName);
         }
 
-        injectEdge.incrementValue(100);
-        cal.add(Calendar.MILLISECOND, 200 + 100 * tenantids.length);
-        Timestamp ts4 = new Timestamp(cal.getTime().getTime());
-        EnvironmentEdgeManager.reset();
-
-        //SingleCellIndexIT.dumpTable(CDCUtil.getCDCIndexName(cdcName));
-
         String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
         try (Connection conn = newConnection(tenantId)) {
-            String sel_sql =
-                    "SELECT to_char(phoenix_row_timestamp()), k, \"CDC JSON\" 
FROM " + cdcFullName +
-                            " WHERE PHOENIX_ROW_TIMESTAMP() >= ? AND 
PHOENIX_ROW_TIMESTAMP() <= ?";
-            Object[] testDataSets = new Object[] {
-                    new Object[] {ts1, ts2, new int[] {1, 2}},
-                    new Object[] {ts2, ts3, new int[] {1, 3}},
-                    new Object[] {ts3, ts4, new int[] {1, 2}},
-                    new Object[] {ts1, ts4, new int[] {1, 2, 1, 3, 1, 2}},
-            };
-            PreparedStatement stmt = conn.prepareStatement(sel_sql);
             // For debug: uncomment to see the exact results logged to console.
-            //System.out.println("----- ts1: " + ts1 + " ts2: " + ts2 + " ts3: 
" + ts3 + " ts4: " +
-            //        ts4);
-            //for (int i = 0; i < testDataSets.length; ++i) {
-            //    Object[] testData = (Object[]) testDataSets[i];
-            //    stmt.setTimestamp(1, (Timestamp) testData[0]);
-            //    stmt.setTimestamp(2, (Timestamp) testData[1]);
-            //    try (ResultSet rs = stmt.executeQuery()) {
-            //        System.out.println("----- Test data set: " + i);
-            //        while (rs.next()) {
-            //            System.out.println("----- " + rs.getString(1) + " " +
-            //                    rs.getInt(2) + " "  + rs.getString(3));
-            //        }
-            //    }
-            //}
-            for (int i = 0; i < testDataSets.length; ++i) {
-                Object[] testData = (Object[]) testDataSets[i];
-                stmt.setTimestamp(1, (Timestamp) testData[0]);
-                stmt.setTimestamp(2, (Timestamp) testData[1]);
-                try (ResultSet rs = stmt.executeQuery()) {
-                    for (int j = 0; j < ((int[]) testData[2]).length; ++j) {
-                        int k = ((int[]) testData[2])[j];
-                        assertEquals(" Index: " + j + " Test data set: " + i,
-                                true, rs.next());
-                        assertEquals(" Index: " + j + " Test data set: " + i,
-                                k, rs.getInt(2));
-                    }
-                    assertEquals("Test data set: " + i, false, rs.next());
-                }
-            }
+            dumpCDCResults(conn, cdcName, pkColumns,
+                    "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " + 
cdcFullName);
 
-            PreparedStatement pstmt = conn.prepareStatement(
-                    "SELECT * FROM " + cdcFullName + " WHERE 
PHOENIX_ROW_TIMESTAMP() > ?");
-            pstmt.setTimestamp(1, ts4);
-            try (ResultSet rs = pstmt.executeQuery()) {
-                assertEquals(false, rs.next());
+            List<ChangeRow> changes = new ArrayList<>();
+            for (Set<ChangeRow> batch: allBatches.get(tenantId)) {
+                changes.addAll(batch);
+            }
+            List<Long> uniqueTimestamps = new ArrayList<>();
+            Integer lastDeletionTSpos = null;
+            for (ChangeRow change: changes) {
+                if (uniqueTimestamps.size() == 0 ||
+                        uniqueTimestamps.get(uniqueTimestamps.size()-1) != 
change.changeTS) {
+                    uniqueTimestamps.add(change.changeTS);
+                }
+                if (change.change == null) {
+                    lastDeletionTSpos = uniqueTimestamps.size() - 1;
+                }
             }
+            Random rand = new Random();
+            int randMinTSpos = rand.nextInt(lastDeletionTSpos - 1);
+            int randMaxTSpos = randMinTSpos + 1 + rand.nextInt(
+                    uniqueTimestamps.size() - (randMinTSpos + 1));
+            verifyChangesViaSCN(tenantId, conn, cdcFullName, pkColumns,
+                    datatableName, dataColumns, changes, 0, 
System.currentTimeMillis());
+            verifyChangesViaSCN(tenantId, conn, cdcFullName, pkColumns,
+                    datatableName, dataColumns, changes, randMinTSpos, 
randMaxTSpos);
+            verifyChangesViaSCN(tenantId, conn, cdcFullName, pkColumns,
+                    datatableName, dataColumns, changes, randMinTSpos, 
lastDeletionTSpos);
+            verifyChangesViaSCN(tenantId, conn, cdcFullName, pkColumns,
+                    datatableName, dataColumns, changes, lastDeletionTSpos, 
randMaxTSpos);
         }
     }
 


Reply via email to