This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch 5.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.3 by this push:
new d1bbd14fdd PHOENIX-7709 Index committer post writer lazy mode - Async
RPC call for verified index mutations (#2297)
d1bbd14fdd is described below
commit d1bbd14fdd91eecbd609fc7032612f4c9023fc25
Author: Viraj Jasani <[email protected]>
AuthorDate: Thu Oct 9 14:14:00 2025 -0700
PHOENIX-7709 Index committer post writer lazy mode - Async RPC call for
verified index mutations (#2297)
---
.../hbase/index/parallel/BaseTaskRunner.java | 7 +++
.../phoenix/hbase/index/parallel/TaskRunner.java | 19 ++++++++
.../AbstractParallelWriterIndexCommitter.java | 35 +++++++++++++-
.../write/LazyParallelWriterIndexCommitter.java | 22 +++++++--
.../index/write/ParallelWriterIndexCommitter.java | 13 +-----
.../java/org/apache/phoenix/end2end/Bson4IT.java | 25 +++++++++-
.../end2end/ConcurrentMutationsExtendedIT.java | 2 +-
.../ConcurrentMutationsLazyPostBatchWriteIT.java | 53 ++++++++++++++++++++++
8 files changed, 156 insertions(+), 20 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/parallel/BaseTaskRunner.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/parallel/BaseTaskRunner.java
index 50b105ecc2..73048ee5ed 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/parallel/BaseTaskRunner.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/parallel/BaseTaskRunner.java
@@ -116,6 +116,13 @@ public abstract class BaseTaskRunner implements TaskRunner
{
throw new EarlyExitFailure("Interrupted and stopped before computation was
complete!");
}
+ @Override
+ public <R> void submitOnly(TaskBatch<R> tasks) {
+ for (Task<R> task : tasks.getTasks()) {
+ this.writerPool.submit(task);
+ }
+ }
+
@Override
public void stop(String why) {
if (this.stopped) {
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/parallel/TaskRunner.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/parallel/TaskRunner.java
index a74c7c087e..f0638110a8 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/parallel/TaskRunner.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/parallel/TaskRunner.java
@@ -60,4 +60,23 @@ public interface TaskRunner extends Stoppable {
*/
public <R> Pair<List<R>, List<Future<R>>> submitUninterruptible(TaskBatch<R>
tasks)
throws EarlyExitFailure, ExecutionException;
+
+ /**
+ * Submit the given tasks to the pool without waiting for them to complete
or collecting results.
+ * This is a fire-and-forget operation that allows tasks to run
asynchronously in the background.
+ * <p>
+ * Unlike {@link #submit(TaskBatch)} and {@link
#submitUninterruptible(TaskBatch)}, this method
+ * does not block waiting for task completion and does not return results or
futures. It is useful
+ * for scenarios where you want to initiate background processing but don't
need to wait for or
+ * collect the results.
+ * <p>
+ * Tasks are submitted to the underlying thread pool and will execute
according to the pool's
+ * scheduling policy. If any task fails during execution, the failure will
be handled internally
+ * and will not propagate back to the caller since no results are collected.
+ * @param <R> the type of result that would be returned by the tasks
(unused since no results
+ * are collected)
+ * @param tasks the batch of tasks to submit for asynchronous execution
+ * @throws ExecutionException if there is an error submitting the tasks to
the thread pool
+ */
+ <R> void submitOnly(TaskBatch<R> tasks) throws ExecutionException;
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/write/AbstractParallelWriterIndexCommitter.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/write/AbstractParallelWriterIndexCommitter.java
index a090a1b59a..c88712f259 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/write/AbstractParallelWriterIndexCommitter.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/write/AbstractParallelWriterIndexCommitter.java
@@ -63,7 +63,6 @@ public abstract class AbstractParallelWriterIndexCommitter
implements IndexCommi
protected QuickFailingTaskRunner pool;
protected KeyValueBuilder kvBuilder;
protected RegionCoprocessorEnvironment env;
- protected TaskBatch<Void> tasks;
protected boolean disableIndexOnFailure = false;
// This relies on Hadoop Configuration to handle warning about deprecated
configs and
@@ -116,6 +115,39 @@ public abstract class AbstractParallelWriterIndexCommitter
implements IndexCommi
public void write(Multimap<HTableInterfaceReference, Mutation> toWrite,
final boolean allowLocalUpdates, final int clientVersion)
throws SingleIndexWriteFailureException {
+ TaskBatch<Void> tasks = new TaskBatch<>(toWrite.asMap().size());
+ addTasks(toWrite, allowLocalUpdates, clientVersion, tasks);
+ submitTasks(tasks);
+ }
+
+ /**
+ * Submits the provided task batch for execution. This method defines the
task submission strategy
+ * and must be implemented by concrete subclasses to specify whether tasks
should be executed
+ * synchronously (blocking until completion) or asynchronously
(fire-and-forget).
+ * @param tasks the batch of index write tasks to submit for execution
+ * @throws SingleIndexWriteFailureException if there is an error during task
submission or
+ * execution
(implementation-dependent)
+ */
+ protected abstract void submitTasks(TaskBatch<Void> tasks)
+ throws SingleIndexWriteFailureException;
+
+ /**
+ * Adds parallel index write tasks to the provided task batch for execution
across multiple index
+ * tables. Each index table gets its own task that will be executed in
parallel to optimize write
+ * performance.
+ * @param toWrite a multimap containing index table references as
keys and their
+ * corresponding mutations as values. Each table
will get its own
+ * parallel task.
+ * @param allowLocalUpdates if false, skips creating tasks for writes to the
same table as the
+ * current region to prevent potential deadlocks
+ * @param clientVersion the Phoenix client version, used for
compatibility checks and
+ * version-specific behavior in the index write
operations
+ * @param tasks the task batch to which the newly created index
write tasks will be
+ * added. This batch needs to be submitted for
parallel execution by the
+ * caller.
+ */
+ private void addTasks(Multimap<HTableInterfaceReference, Mutation> toWrite,
+ boolean allowLocalUpdates, int clientVersion, TaskBatch<Void> tasks) {
/*
* This bit here is a little odd, so let's explain what's going on.
Basically, we want to do the
* writes in parallel to each index table, so each table gets its own task
and is submitted to
@@ -128,7 +160,6 @@ public abstract class AbstractParallelWriterIndexCommitter
implements IndexCommi
*/
Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries =
toWrite.asMap().entrySet();
- tasks = new TaskBatch<Void>(entries.size());
for (Entry<HTableInterfaceReference, Collection<Mutation>> entry :
entries) {
// get the mutations for each table. We leak the implementation here a
little bit to save
// doing a complete copy over of all the index update for each table.
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/write/LazyParallelWriterIndexCommitter.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/write/LazyParallelWriterIndexCommitter.java
index ddbf9f003e..9ac746dab3 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/write/LazyParallelWriterIndexCommitter.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/write/LazyParallelWriterIndexCommitter.java
@@ -17,17 +17,31 @@
*/
package org.apache.phoenix.hbase.index.write;
+import
org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException;
+import org.apache.phoenix.hbase.index.parallel.TaskBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Like the {@link ParallelWriterIndexCommitter}, but does not block
*/
public class LazyParallelWriterIndexCommitter extends
AbstractParallelWriterIndexCommitter {
- // for testing
- public LazyParallelWriterIndexCommitter(String hbaseVersion) {
- super(hbaseVersion);
- }
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(LazyParallelWriterIndexCommitter.class);
public LazyParallelWriterIndexCommitter() {
super();
}
+
+ @Override
+ protected void submitTasks(TaskBatch<Void> tasks) throws
SingleIndexWriteFailureException {
+ try {
+ pool.submitOnly(tasks);
+ } catch (Exception e) {
+ LOGGER.error("Error while submitting the task.", e);
+ propagateFailure(e.getCause());
+ }
+ }
+
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
index cbffbc6579..18a25ae020 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
@@ -18,15 +18,12 @@
package org.apache.phoenix.hbase.index.write;
import java.util.concurrent.ExecutionException;
-import org.apache.hadoop.hbase.client.Mutation;
import
org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException;
import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
-import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.hbase.index.parallel.TaskBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.phoenix.thirdparty.com.google.common.collect.Multimap;
-
/**
* Write index updates to the index tables in parallel. We attempt to early
exit from the writes if
* any of the index updates fails. Completion is determined by the following
criteria: *
@@ -49,12 +46,7 @@ public class ParallelWriterIndexCommitter extends
AbstractParallelWriterIndexCom
}
@Override
- public void write(Multimap<HTableInterfaceReference, Mutation> toWrite,
- final boolean allowLocalUpdates, final int clientVersion)
- throws SingleIndexWriteFailureException {
-
- super.write(toWrite, allowLocalUpdates, clientVersion);
- // actually submit the tasks to the pool and wait for them to finish/fail
+ protected void submitTasks(TaskBatch<Void> tasks) throws
SingleIndexWriteFailureException {
try {
pool.submitUninterruptible(tasks);
} catch (EarlyExitFailure e) {
@@ -63,6 +55,5 @@ public class ParallelWriterIndexCommitter extends
AbstractParallelWriterIndexCom
LOGGER.error("Found a failed index update!");
propagateFailure(e.getCause());
}
-
}
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson4IT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson4IT.java
index 8d11748ead..103f0e6df5 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson4IT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson4IT.java
@@ -44,6 +44,9 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.ExplainPlan;
import org.apache.phoenix.compile.ExplainPlanAttributes;
import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.hbase.index.metrics.GlobalIndexCheckerSource;
+import org.apache.phoenix.hbase.index.metrics.GlobalIndexCheckerSourceImpl;
+import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.schema.types.PDouble;
@@ -68,7 +71,7 @@ import
org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
/**
* Tests for BSON.
*/
-@Category(ParallelStatsDisabledTest.class)
+@Category(NeedsOwnMiniClusterTest.class)
@RunWith(Parameterized.class)
public class Bson4IT extends ParallelStatsDisabledIT {
@@ -291,7 +294,8 @@ public class Bson4IT extends ParallelStatsDisabledIT {
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
String ddl = "CREATE TABLE " + tableName + " (PK1 VARCHAR NOT NULL, C1
VARCHAR, COL BSON"
- + " CONSTRAINT pk PRIMARY KEY(PK1))";
+ + " CONSTRAINT pk PRIMARY KEY(PK1))"
+ + " \"org.apache.hadoop.hbase.index.lazy.post_batch.write\"=true";
final String indexDdl1;
if (!this.coveredIndex) {
@@ -367,6 +371,7 @@ public class Bson4IT extends ParallelStatsDisabledIT {
stmt.executeUpdate();
conn.commit();
+ Thread.sleep(500);
ResultSet rs = conn.createStatement().executeQuery("SELECT count(*) FROM
" + tableName);
assertTrue(rs.next());
@@ -450,9 +455,25 @@ public class Bson4IT extends ParallelStatsDisabledIT {
assertFalse(rs.next());
validateExplainPlan(ps, tableName, "FULL SCAN ");
+ verifyNoReadRepair();
}
}
+ private void verifyNoReadRepair() {
+ GlobalIndexCheckerSourceImpl indexCheckerSource =
+ (GlobalIndexCheckerSourceImpl) MetricsIndexerSourceFactory.getInstance()
+ .getGlobalIndexCheckerSource();
+
+ long indexRepairs = indexCheckerSource.getMetricsRegistry()
+ .getCounter(GlobalIndexCheckerSource.INDEX_REPAIR, 0).value();
+ assertEquals("No index repairs should occur during test execution", 0,
indexRepairs);
+
+ long indexRepairFailures = indexCheckerSource.getMetricsRegistry()
+ .getCounter(GlobalIndexCheckerSource.INDEX_REPAIR_FAILURE, 0).value();
+ assertEquals("No index repair failures should occur during test
execution", 0,
+ indexRepairFailures);
+ }
+
@Test
public void testBsonValueFunctionWithBSONType() throws Exception {
Assume.assumeTrue(this.coveredIndex && this.columnEncoded); // Since
indexing on BSON not
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
index 69b81826d5..03ca9e1a68 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
@@ -68,7 +68,7 @@ public class ConcurrentMutationsExtendedIT extends
ParallelStatsDisabledIT {
private static final String MVCC_LOCK_TEST_TABLE_PREFIX = "MVCCLOCKTEST_";
private static final String LOCK_TEST_TABLE_PREFIX = "LOCKTEST_";
private static final int ROW_LOCK_WAIT_TIME = 10000;
- private static final int MAX_LOOKBACK_AGE = 1000000;
+ protected static final int MAX_LOOKBACK_AGE = 1000000;
private final Object lock = new Object();
public ConcurrentMutationsExtendedIT(boolean uncovered) {
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java
new file mode 100644
index 0000000000..761c8658eb
--- /dev/null
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import java.util.Map;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+
+/**
+ * Test class that extends ConcurrentMutationsExtendedIT with lazy post batch
write enabled.
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class ConcurrentMutationsLazyPostBatchWriteIT extends
ConcurrentMutationsExtendedIT {
+
+ public ConcurrentMutationsLazyPostBatchWriteIT(boolean uncovered) {
+ super(uncovered);
+ Assume.assumeFalse("Only covered index supports lazy post batch write
mode", uncovered);
+ }
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(4);
+
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB,
Long.toString(0));
+ props.put(IndexRegionObserver.INDEX_LAZY_POST_BATCH_WRITE, "true");
+
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+ Integer.toString(MAX_LOOKBACK_AGE));
+ props.put("hbase.rowlock.wait.duration", "100");
+ props.put("phoenix.index.concurrent.wait.duration.ms", "10");
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+}