This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 835de2b  PHOENIX-5535 Index rebuilds via 
UngroupedAggregateRegionObserver should replay delete markers
835de2b is described below

commit 835de2bb57bcb47ee6d0132cb69a58419d3ee552
Author: Kadir <[email protected]>
AuthorDate: Sun Oct 20 17:40:35 2019 -0700

    PHOENIX-5535 Index rebuilds via UngroupedAggregateRegionObserver should 
replay delete markers
---
 .../org/apache/phoenix/end2end/IndexToolIT.java    | 67 ++++++++++++++++++++++
 .../phoenix/compile/ServerBuildIndexCompiler.java  | 29 ++++++----
 .../UngroupedAggregateRegionObserver.java          | 19 +++++-
 .../apache/phoenix/index/GlobalIndexChecker.java   | 10 ++--
 4 files changed, 109 insertions(+), 16 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index a4b7bdc..c71da6b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -29,6 +29,7 @@ import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -251,6 +252,72 @@ public class IndexToolIT extends 
BaseUniqueNamesOwnClusterIT {
         }
     }
 
+    private void setEveryNthRowWithNull(int nrows, int nthRowNull, 
PreparedStatement stmt) throws Exception {
+        for (int i = 0; i < nrows; i++) {
+            stmt.setInt(1, i);
+            stmt.setInt(2, i * 10);
+            if (i % nthRowNull != 0) {
+                stmt.setInt(3, 9000 + i * nthRowNull);
+            } else {
+                stmt.setNull(3, Types.INTEGER);
+            }
+            stmt.execute();
+        }
+    }
+
+    @Test
+    public void testWithSetNull() throws Exception {
+        // This test is for building non-transactional global indexes with 
direct api
+        if (localIndex || transactional) {
+            return;
+        }
+        // This tests the cases where a column having a null value is 
overwritten with a not null value and vice versa;
+        // and after that the index table is still rebuilt correctly
+        final int NROWS = 2 * 3 * 5 * 7;
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, 
dataTableName);
+        String indexTableName = generateUniqueName();
+        String indexTableFullName = SchemaUtil.getTableName(schemaName, 
indexTableName);
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String stmString1 =
+                    "CREATE TABLE " + dataTableFullName
+                            + " (ID INTEGER NOT NULL PRIMARY KEY, VAL INTEGER, 
ZIP INTEGER) "
+                            + tableDDLOptions;
+            conn.createStatement().execute(stmString1);
+            String upsertStmt = "UPSERT INTO " + dataTableFullName + " 
VALUES(?,?,?)";
+            PreparedStatement stmt = conn.prepareStatement(upsertStmt);
+            setEveryNthRowWithNull(NROWS, 2, stmt);
+            conn.commit();
+            setEveryNthRowWithNull(NROWS, 3, stmt);
+            conn.commit();
+            String stmtString2 =
+                    String.format(
+                            "CREATE %s INDEX %s ON %s (VAL) INCLUDE (ZIP) 
ASYNC ",
+                            (localIndex ? "LOCAL" : ""), indexTableName, 
dataTableFullName);
+            conn.createStatement().execute(stmtString2);
+            // Run the index MR job and verify that the index table is built 
correctly
+            IndexTool indexTool = runIndexTool(directApi, useSnapshot, 
schemaName, dataTableName, indexTableName, null, 0, new String[0]);
+            assertEquals(NROWS, 
indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
+            long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, 
dataTableFullName, indexTableFullName);
+            assertEquals(NROWS, actualRowCount);
+
+            // Repeat the test with compaction
+            setEveryNthRowWithNull(NROWS, 5, stmt);
+            conn.commit();
+            setEveryNthRowWithNull(NROWS, 7, stmt);
+            conn.commit();
+            TestUtil.doMajorCompaction(conn, dataTableFullName);
+            // Run the index MR job and verify that the index table is built 
correctly
+            indexTool = runIndexTool(directApi, useSnapshot, schemaName, 
dataTableName, indexTableName, null, 0, new String[0]);
+            assertEquals(NROWS, 
indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
+            actualRowCount = IndexScrutiny.scrutinizeIndex(conn, 
dataTableFullName, indexTableFullName);
+            assertEquals(NROWS, actualRowCount);
+        }
+    }
+
+
     @Test
     public void testBuildSecondaryIndexAndScrutinize() throws Exception {
         // This test is for building non-transactional global indexes with 
direct api
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
index 7d1c1b4..8035982 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
@@ -89,7 +89,12 @@ public class ServerBuildIndexCompiler {
 
     public MutationPlan compile(PTable index) throws SQLException {
         try (final PhoenixStatement statement = new 
PhoenixStatement(connection)) {
-            String query = "SELECT count(*) FROM " + tableName;
+            String query;
+            if (index.getIndexType() == PTable.IndexType.LOCAL) {
+                query = "SELECT count(*) FROM " + tableName;
+            } else {
+                query = "SELECT * FROM " + tableName;
+            }
             this.plan = statement.compileQuery(query);
             TableRef tableRef = plan.getTableRef();
             Scan scan = plan.getContext().getScan();
@@ -110,26 +115,28 @@ public class ServerBuildIndexCompiler {
             // the index table possibly remotely
             if (index.getIndexType() == PTable.IndexType.LOCAL) {
                 
scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO, 
ByteUtil.copyKeyBytesIfNecessary(ptr));
+                // By default, we'd use a FirstKeyOnly filter as nothing else 
needs to be projected for count(*).
+                // However, in this case, we need to project all of the data 
columns that contribute to the index.
+                IndexMaintainer indexMaintainer = 
index.getIndexMaintainer(dataTable, connection);
+                for (ColumnReference columnRef : 
indexMaintainer.getAllColumns()) {
+                    if (index.getImmutableStorageScheme() == 
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+                        scan.addFamily(columnRef.getFamily());
+                    } else {
+                        scan.addColumn(columnRef.getFamily(), 
columnRef.getQualifier());
+                    }
+                }
             } else {
                 scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, 
ByteUtil.copyKeyBytesIfNecessary(ptr));
                 scan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, 
TRUE_BYTES);
                 ScanUtil.setClientVersion(scan, 
MetaDataProtocol.PHOENIX_VERSION);
-            }
-            // By default, we'd use a FirstKeyOnly filter as nothing else 
needs to be projected for count(*).
-            // However, in this case, we need to project all of the data 
columns that contribute to the index.
-            IndexMaintainer indexMaintainer = 
index.getIndexMaintainer(dataTable, connection);
-            for (ColumnReference columnRef : indexMaintainer.getAllColumns()) {
-                if (index.getImmutableStorageScheme() == 
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
-                    scan.addFamily(columnRef.getFamily());
-                } else {
-                    scan.addColumn(columnRef.getFamily(), 
columnRef.getQualifier());
-                }
+                scan.setAttribute(BaseScannerRegionObserver.UNGROUPED_AGG, 
TRUE_BYTES);
             }
 
             if (dataTable.isTransactional()) {
                 scan.setAttribute(BaseScannerRegionObserver.TX_STATE, 
connection.getMutationState().encodeTransaction());
             }
 
+
             // Go through MutationPlan abstraction so that we can create local 
indexes
             // with a connectionless connection (which makes testing easier).
             return new RowCountMutationPlan(plan.getContext(), 
PhoenixStatement.Operation.UPSERT);
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index d45f047..fc7a81e 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -1100,6 +1100,20 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
         byte[] clientVersionBytes = 
scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
         boolean hasMore;
         int rowCount = 0;
+        RegionScanner newScanner;
+        if (!scan.isRaw()) {
+            // We need to use raw scan here to replay delete markers too 
(PHOENIX-5535)
+            scan.getFamilyMap().clear();
+            scan.setRaw(true);
+            scan.setCacheBlocks(false);
+            scan.readAllVersions();
+            newScanner = region.getScanner(scan);
+        }
+        else {
+            // The scan is already raw, so do not do anything
+            newScanner = innerScanner;
+        }
+
         try {
             int maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, 
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
             long maxBatchSizeBytes = 
config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
@@ -1111,7 +1125,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
             synchronized (innerScanner) {
                 do {
                     List<Cell> results = new ArrayList<Cell>();
-                    hasMore = innerScanner.nextRaw(results);
+                    hasMore =  newScanner.nextRaw(results);
                     if (!results.isEmpty()) {
                         Put put = null;
                         Delete del = null;
@@ -1164,6 +1178,9 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
             LOGGER.error("IOException during rebuilding: " + 
Throwables.getStackTraceAsString(e));
             throw e;
         } finally {
+            if (newScanner != innerScanner) {
+                newScanner.close();
+            }
             region.closeRegionOperation();
         }
         byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index dd95c8e..bb4e1b0 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -299,16 +299,18 @@ public class GlobalIndexChecker implements 
RegionCoprocessor, RegionObserver {
                 buildIndexScan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, 
scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD));
                 
buildIndexScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, 
TRUE_BYTES);
                 
buildIndexScan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK,
 Bytes.toBytes(true));
+                // We want delete markers to be replayed during index rebuild.
+                buildIndexScan.setRaw(true);
+                buildIndexScan.setCacheBlocks(false);
+                buildIndexScan.readAllVersions();
+                buildIndexScan.setTimeRange(0, maxTimestamp);
             }
             // Rebuild the index row from the corresponding the row in the the 
data table
             // Get the data row key from the index row key
             byte[] dataRowKey = indexMaintainer.buildDataRowKey(new 
ImmutableBytesWritable(indexRowKey), viewConstants);
             buildIndexScan.withStartRow(dataRowKey, true);
             buildIndexScan.withStopRow(dataRowKey, true);
-            buildIndexScan.setTimeRange(0, maxTimestamp);
-            // If the data table row has been deleted then we want to delete 
the corresponding index row too.
-            // Thus, we are using a raw scan
-            buildIndexScan.setRaw(true);
+
             try (ResultScanner resultScanner = 
dataHTable.getScanner(buildIndexScan)){
                 resultScanner.next();
             } catch (Throwable t) {

Reply via email to