This is an automated email from the ASF dual-hosted git repository.
larsh pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push:
new daa6816 Local indexes get out of sync after changes for global
consistent indexes.
daa6816 is described below
commit daa6816dcb3ac035bf8553e6bf2ff8a18e80e6e4
Author: Lars <[email protected]>
AuthorDate: Sat Aug 22 11:55:24 2020 -0700
Local indexes get out of sync after changes for global consistent indexes.
---
.../apache/phoenix/end2end/index/LocalIndexIT.java | 33 ++++++++++
.../phoenix/hbase/index/IndexRegionObserver.java | 70 ++++++++++++----------
2 files changed, 71 insertions(+), 32 deletions(-)
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
index 481ce1c..012bbca 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
@@ -102,6 +102,39 @@ public class LocalIndexIT extends BaseLocalIndexIT {
}
@Test
+ public void testLocalIndexConsistency() throws Exception {
+ if (isNamespaceMapped) {
+ return;
+ }
+ String tableName = schemaName + "." + generateUniqueName();
+ String indexName = "IDX_" + generateUniqueName();
+
+ Connection conn = getConnection();
+ conn.setAutoCommit(true);
+
+ conn.createStatement().execute("CREATE TABLE " + tableName + " (pk
INTEGER PRIMARY KEY, v1 FLOAT) SPLIT ON (2000)");
+ conn.createStatement().execute("CREATE LOCAL INDEX " + indexName + "
ON " + tableName + "(v1)");
+ conn.createStatement().execute("UPSERT INTO " + tableName + "
VALUES(rand() * 4000, rand())");
+
+ ResultSet rs;
+ for (int i=0; i<15; i++) {
+ conn.createStatement().execute("UPSERT INTO " + tableName + "
SELECT rand() * 4000, rand() FROM " + tableName);
+
+ rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " +
tableName);
+ rs.next();
+ int indexCount = rs.getInt(1);
+ rs.close();
+
+ rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */
COUNT(*) FROM " + tableName);
+ rs.next();
+ int tableCount = rs.getInt(1);
+ rs.close();
+
+ assertEquals(indexCount, tableCount);
+ }
+ }
+
+ @Test
public void testUseUncoveredLocalIndexWithPrefix() throws Exception {
String tableName = schemaName + "." + generateUniqueName();
String indexName = "IDX_" + generateUniqueName();
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index e24b8e2..49b5509 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -685,7 +685,7 @@ public class IndexRegionObserver extends BaseRegionObserver
{
* unverified status. In phase 2, data table mutations are applied. In
phase 3, the status for an index table row is
* either set to "verified" or the row is deleted.
*/
- private void
preparePreIndexMutations(ObserverContext<RegionCoprocessorEnvironment> c,
+ private boolean
preparePreIndexMutations(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp,
BatchMutateContext context,
Collection<? extends Mutation>
pendingMutations,
@@ -699,13 +699,6 @@ public class IndexRegionObserver extends
BaseRegionObserver {
current = NullSpan.INSTANCE;
}
current.addTimelineAnnotation("Built index updates, doing
preStep");
- // Handle local index updates
- for (IndexMaintainer indexMaintainer : maintainers) {
- if (indexMaintainer.isLocalIndex()) {
- handleLocalIndexUpdates(c, miniBatchOp, pendingMutations,
indexMetaData);
- break;
- }
- }
// The rest of this method is for handling global index updates
context.indexUpdates =
ArrayListMultimap.<HTableInterfaceReference, Pair<Mutation, byte[]>>create();
prepareIndexMutations(context, maintainers, now);
@@ -713,6 +706,9 @@ public class IndexRegionObserver extends BaseRegionObserver
{
context.preIndexUpdates =
ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
int updateCount = 0;
for (IndexMaintainer indexMaintainer : maintainers) {
+ if (indexMaintainer.isLocalIndex()) {
+ continue;
+ }
updateCount++;
byte[] emptyCF =
indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
@@ -734,6 +730,7 @@ public class IndexRegionObserver extends BaseRegionObserver
{
}
}
TracingUtils.addAnnotation(current, "index update count",
updateCount);
+ return updateCount != 0;
}
}
@@ -779,11 +776,22 @@ public class IndexRegionObserver extends
BaseRegionObserver {
return true;
}
- private void preparePostIndexMutations(BatchMutateContext context, long
now, PhoenixIndexMetaData indexMetaData,
- String tableName)
+ private void
preparePostIndexMutations(ObserverContext<RegionCoprocessorEnvironment> c,
+
MiniBatchOperationInProgress<Mutation> miniBatchOp,
+ BatchMutateContext context,
+ Collection<? extends Mutation>
pendingMutations,
+ long now,
+ PhoenixIndexMetaData indexMetaData)
throws Throwable {
context.postIndexUpdates =
ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
List<IndexMaintainer> maintainers =
indexMetaData.getIndexMaintainers();
+ // Handle local index updates
+ for (IndexMaintainer indexMaintainer : maintainers) {
+ if (indexMaintainer.isLocalIndex()) {
+ handleLocalIndexUpdates(c, miniBatchOp, pendingMutations,
indexMetaData);
+ break;
+ }
+ }
// Check if we need to skip post index update for any of the rows
for (IndexMaintainer indexMaintainer : maintainers) {
byte[] emptyCF =
indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
@@ -820,6 +828,7 @@ public class IndexRegionObserver extends BaseRegionObserver
{
rowLock.release();
}
context.rowLocks.clear();
+ String tableName =
c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
throw new IOException("One of the concurrent mutations
does not have all indexed columns. " +
"The batch needs to be retried " + tableName);
}
@@ -861,30 +870,27 @@ public class IndexRegionObserver extends
BaseRegionObserver {
return;
}
long start = EnvironmentEdgeManager.currentTimeMillis();
- preparePreIndexMutations(c, miniBatchOp, context, mutations, now,
indexMetaData);
+ boolean hasGlobalIndex = preparePreIndexMutations(c, miniBatchOp,
context, mutations, now, indexMetaData);
metricSource.updateIndexPrepareTime(EnvironmentEdgeManager.currentTimeMillis()
- start);
- // Sleep for one millisecond if we have prepared the index updates in
less than 1 ms. The sleep is necessary to
- // get different timestamps for concurrent batches that share common
rows. It is very rare that the index updates
- // can be prepared in less than one millisecond
- if (!context.rowLocks.isEmpty() && now ==
EnvironmentEdgeManager.currentTimeMillis()) {
- Thread.sleep(1);
- LOG.debug("slept 1ms for " +
c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
- }
- // Release the locks before making RPC calls for index updates
- for (RowLock rowLock : context.rowLocks) {
- rowLock.release();
- }
- // Do the first phase index updates
- doPre(c, context, miniBatchOp);
- // Acquire the locks again before letting the region proceed with data
table updates
- List<RowLock> rowLocks =
Lists.newArrayListWithExpectedSize(context.rowLocks.size());
- for (RowLock rowLock : context.rowLocks) {
- rowLocks.add(lockManager.lockRow(rowLock.getRowKey(),
rowLockWaitDuration));
+ if (hasGlobalIndex) {
+ // Sleep for one millisecond if we have prepared the index updates
in less than 1 ms. The sleep is necessary to
+ // get different timestamps for concurrent batches that share
common rows. It is very rare that the index updates
+ // can be prepared in less than one millisecond
+ if (!context.rowLocks.isEmpty() && now ==
EnvironmentEdgeManager.currentTimeMillis()) {
+ Thread.sleep(1);
+ LOG.debug("slept 1ms for " +
c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
+ }
+ // Release the locks before making RPC calls for index updates
+ for (RowLock rowLock : context.rowLocks) {
+ rowLock.release();
+ }
+ // Do the first phase index updates
+ doPre(c, context, miniBatchOp);
+ // Acquire the locks again before letting the region proceed with
data table updates
+ context.rowLocks.clear();
+ lockRows(context);
}
- context.rowLocks.clear();
- context.rowLocks = rowLocks;
- preparePostIndexMutations(context, now, indexMetaData,
-
c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
+ preparePostIndexMutations(c, miniBatchOp, context, mutations, now,
indexMetaData);
if (failDataTableUpdatesForTesting) {
throw new DoNotRetryIOException("Simulating the data table write
failure");
}