This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch 4.x-HBase-1.5
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.5 by this push:
new 28ab1b3 PHOENIX-5535 Index rebuilds via
UngroupedAggregateRegionObserver should replay delete markers
28ab1b3 is described below
commit 28ab1b3a15bd8d6d83ccf9b25ef3a2ef4843b18f
Author: Kadir <[email protected]>
AuthorDate: Sun Oct 20 17:40:35 2019 -0700
PHOENIX-5535 Index rebuilds via UngroupedAggregateRegionObserver should
replay delete markers
Further changes
---
.../org/apache/phoenix/end2end/IndexToolIT.java | 67 ++++++++++++++++++++++
.../phoenix/compile/ServerBuildIndexCompiler.java | 29 ++++++----
.../UngroupedAggregateRegionObserver.java | 19 +++++-
.../apache/phoenix/index/GlobalIndexChecker.java | 10 ++--
4 files changed, 109 insertions(+), 16 deletions(-)
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 2f12ae9..69356e8 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -29,6 +29,7 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -254,6 +255,72 @@ public class IndexToolIT extends
BaseUniqueNamesOwnClusterIT {
}
}
+ private void setEveryNthRowWithNull(int nrows, int nthRowNull,
PreparedStatement stmt) throws Exception {
+ for (int i = 0; i < nrows; i++) {
+ stmt.setInt(1, i);
+ stmt.setInt(2, i * 10);
+ if (i % nthRowNull != 0) {
+ stmt.setInt(3, 9000 + i * nthRowNull);
+ } else {
+ stmt.setNull(3, Types.INTEGER);
+ }
+ stmt.execute();
+ }
+ }
+
+ @Test
+ public void testWithSetNull() throws Exception {
+ // This test is for building non-transactional global indexes with
direct api
+ if (localIndex || transactional) {
+ return;
+ }
+ // This tests the cases where a column having a null value is
overwritten with a not null value and vice versa;
+ // and after that the index table is still rebuilt correctly
+ final int NROWS = 2 * 3 * 5 * 7;
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName,
dataTableName);
+ String indexTableName = generateUniqueName();
+ String indexTableFullName = SchemaUtil.getTableName(schemaName,
indexTableName);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String stmString1 =
+ "CREATE TABLE " + dataTableFullName
+ + " (ID INTEGER NOT NULL PRIMARY KEY, VAL INTEGER,
ZIP INTEGER) "
+ + tableDDLOptions;
+ conn.createStatement().execute(stmString1);
+ String upsertStmt = "UPSERT INTO " + dataTableFullName + "
VALUES(?,?,?)";
+ PreparedStatement stmt = conn.prepareStatement(upsertStmt);
+ setEveryNthRowWithNull(NROWS, 2, stmt);
+ conn.commit();
+ setEveryNthRowWithNull(NROWS, 3, stmt);
+ conn.commit();
+ String stmtString2 =
+ String.format(
+ "CREATE %s INDEX %s ON %s (VAL) INCLUDE (ZIP)
ASYNC ",
+ (localIndex ? "LOCAL" : ""), indexTableName,
dataTableFullName);
+ conn.createStatement().execute(stmtString2);
+ // Run the index MR job and verify that the index table is built
correctly
+ IndexTool indexTool = runIndexTool(directApi, useSnapshot,
schemaName, dataTableName, indexTableName, null, 0, new String[0]);
+ assertEquals(NROWS,
indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
+ long actualRowCount = IndexScrutiny.scrutinizeIndex(conn,
dataTableFullName, indexTableFullName);
+ assertEquals(NROWS, actualRowCount);
+
+ // Repeat the test with compaction
+ setEveryNthRowWithNull(NROWS, 5, stmt);
+ conn.commit();
+ setEveryNthRowWithNull(NROWS, 7, stmt);
+ conn.commit();
+ TestUtil.doMajorCompaction(conn, dataTableFullName);
+ // Run the index MR job and verify that the index table is built
correctly
+ indexTool = runIndexTool(directApi, useSnapshot, schemaName,
dataTableName, indexTableName, null, 0, new String[0]);
+ assertEquals(NROWS,
indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
+ actualRowCount = IndexScrutiny.scrutinizeIndex(conn,
dataTableFullName, indexTableFullName);
+ assertEquals(NROWS, actualRowCount);
+ }
+ }
+
+
@Test
public void testBuildSecondaryIndexAndScrutinize() throws Exception {
// This test is for building non-transactional global indexes with
direct api
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
index 7d1c1b4..8035982 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
@@ -89,7 +89,12 @@ public class ServerBuildIndexCompiler {
public MutationPlan compile(PTable index) throws SQLException {
try (final PhoenixStatement statement = new
PhoenixStatement(connection)) {
- String query = "SELECT count(*) FROM " + tableName;
+ String query;
+ if (index.getIndexType() == PTable.IndexType.LOCAL) {
+ query = "SELECT count(*) FROM " + tableName;
+ } else {
+ query = "SELECT * FROM " + tableName;
+ }
this.plan = statement.compileQuery(query);
TableRef tableRef = plan.getTableRef();
Scan scan = plan.getContext().getScan();
@@ -110,26 +115,28 @@ public class ServerBuildIndexCompiler {
// the index table possibly remotely
if (index.getIndexType() == PTable.IndexType.LOCAL) {
scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO,
ByteUtil.copyKeyBytesIfNecessary(ptr));
+ // By default, we'd use a FirstKeyOnly filter as nothing else
needs to be projected for count(*).
+ // However, in this case, we need to project all of the data
columns that contribute to the index.
+ IndexMaintainer indexMaintainer =
index.getIndexMaintainer(dataTable, connection);
+ for (ColumnReference columnRef :
indexMaintainer.getAllColumns()) {
+ if (index.getImmutableStorageScheme() ==
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+ scan.addFamily(columnRef.getFamily());
+ } else {
+ scan.addColumn(columnRef.getFamily(),
columnRef.getQualifier());
+ }
+ }
} else {
scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD,
ByteUtil.copyKeyBytesIfNecessary(ptr));
scan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES,
TRUE_BYTES);
ScanUtil.setClientVersion(scan,
MetaDataProtocol.PHOENIX_VERSION);
- }
- // By default, we'd use a FirstKeyOnly filter as nothing else
needs to be projected for count(*).
- // However, in this case, we need to project all of the data
columns that contribute to the index.
- IndexMaintainer indexMaintainer =
index.getIndexMaintainer(dataTable, connection);
- for (ColumnReference columnRef : indexMaintainer.getAllColumns()) {
- if (index.getImmutableStorageScheme() ==
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
- scan.addFamily(columnRef.getFamily());
- } else {
- scan.addColumn(columnRef.getFamily(),
columnRef.getQualifier());
- }
+ scan.setAttribute(BaseScannerRegionObserver.UNGROUPED_AGG,
TRUE_BYTES);
}
if (dataTable.isTransactional()) {
scan.setAttribute(BaseScannerRegionObserver.TX_STATE,
connection.getMutationState().encodeTransaction());
}
+
// Go through MutationPlan abstraction so that we can create local
indexes
// with a connectionless connection (which makes testing easier).
return new RowCountMutationPlan(plan.getContext(),
PhoenixStatement.Operation.UPSERT);
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 3a03f94..1c9b7b9 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -1069,6 +1069,20 @@ public class UngroupedAggregateRegionObserver extends
BaseScannerRegionObserver
byte[] clientVersionBytes =
scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
boolean hasMore;
int rowCount = 0;
+ RegionScanner newScanner;
+ if (!scan.isRaw()) {
+ // We need to use raw scan here to replay delete markers too
(PHOENIX-5535)
+ scan.getFamilyMap().clear();
+ scan.setRaw(true);
+ scan.setCacheBlocks(false);
+ scan.setMaxVersions();
+ newScanner = region.getScanner(scan);
+ }
+ else {
+ // The scan is already raw, so do not do anything
+ newScanner = innerScanner;
+ }
+
try {
int maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB,
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
long maxBatchSizeBytes =
config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
@@ -1080,7 +1094,7 @@ public class UngroupedAggregateRegionObserver extends
BaseScannerRegionObserver
synchronized (innerScanner) {
do {
List<Cell> results = new ArrayList<Cell>();
- hasMore = innerScanner.nextRaw(results);
+ hasMore = newScanner.nextRaw(results);
if (!results.isEmpty()) {
Put put = null;
Delete del = null;
@@ -1133,6 +1147,9 @@ public class UngroupedAggregateRegionObserver extends
BaseScannerRegionObserver
LOGGER.error("IOException during rebuilding: " +
Throwables.getStackTraceAsString(e));
throw e;
} finally {
+ if (newScanner != innerScanner) {
+ newScanner.close();
+ }
region.closeRegionOperation();
}
byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index 1b3b9c3..2002aff 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -299,16 +299,18 @@ public class GlobalIndexChecker extends
BaseRegionObserver {
buildIndexScan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD,
scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD));
buildIndexScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES,
TRUE_BYTES);
buildIndexScan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK,
Bytes.toBytes(true));
+ // We want delete markers to be replayed during index rebuild.
+ buildIndexScan.setRaw(true);
+ buildIndexScan.setCacheBlocks(false);
+ buildIndexScan.setMaxVersions();
+ buildIndexScan.setTimeRange(0, maxTimestamp);
}
// Rebuild the index row from the corresponding the row in the the
data table
// Get the data row key from the index row key
byte[] dataRowKey = indexMaintainer.buildDataRowKey(new
ImmutableBytesWritable(indexRowKey), viewConstants);
buildIndexScan.withStartRow(dataRowKey, true);
buildIndexScan.withStopRow(dataRowKey, true);
- buildIndexScan.setTimeRange(0, maxTimestamp);
- // If the data table row has been deleted then we want to delete
the corresponding index row too.
- // Thus, we are using a raw scan
- buildIndexScan.setRaw(true);
+
try (ResultScanner resultScanner =
dataHTable.getScanner(buildIndexScan)){
resultScanner.next();
} catch (Throwable t) {