This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch 5.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/5.3 by this push:
     new d1bbd14fdd PHOENIX-7709 Index committer post writer lazy mode - Async 
RPC call for verified index mutations (#2297)
d1bbd14fdd is described below

commit d1bbd14fdd91eecbd609fc7032612f4c9023fc25
Author: Viraj Jasani <[email protected]>
AuthorDate: Thu Oct 9 14:14:00 2025 -0700

    PHOENIX-7709 Index committer post writer lazy mode - Async RPC call for 
verified index mutations (#2297)
---
 .../hbase/index/parallel/BaseTaskRunner.java       |  7 +++
 .../phoenix/hbase/index/parallel/TaskRunner.java   | 19 ++++++++
 .../AbstractParallelWriterIndexCommitter.java      | 35 +++++++++++++-
 .../write/LazyParallelWriterIndexCommitter.java    | 22 +++++++--
 .../index/write/ParallelWriterIndexCommitter.java  | 13 +-----
 .../java/org/apache/phoenix/end2end/Bson4IT.java   | 25 +++++++++-
 .../end2end/ConcurrentMutationsExtendedIT.java     |  2 +-
 .../ConcurrentMutationsLazyPostBatchWriteIT.java   | 53 ++++++++++++++++++++++
 8 files changed, 156 insertions(+), 20 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/parallel/BaseTaskRunner.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/parallel/BaseTaskRunner.java
index 50b105ecc2..73048ee5ed 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/parallel/BaseTaskRunner.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/parallel/BaseTaskRunner.java
@@ -116,6 +116,13 @@ public abstract class BaseTaskRunner implements TaskRunner 
{
     throw new EarlyExitFailure("Interrupted and stopped before computation was 
complete!");
   }
 
+  @Override
+  public <R> void submitOnly(TaskBatch<R> tasks) {
+    for (Task<R> task : tasks.getTasks()) {
+      this.writerPool.submit(task);
+    }
+  }
+
   @Override
   public void stop(String why) {
     if (this.stopped) {
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/parallel/TaskRunner.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/parallel/TaskRunner.java
index a74c7c087e..f0638110a8 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/parallel/TaskRunner.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/parallel/TaskRunner.java
@@ -60,4 +60,23 @@ public interface TaskRunner extends Stoppable {
    */
   public <R> Pair<List<R>, List<Future<R>>> submitUninterruptible(TaskBatch<R> 
tasks)
     throws EarlyExitFailure, ExecutionException;
+
+  /**
+   * Submit the given tasks to the pool without waiting for them to complete 
or collecting results.
+   * This is a fire-and-forget operation that allows tasks to run 
asynchronously in the background.
+   * <p>
+   * Unlike {@link #submit(TaskBatch)} and {@link 
#submitUninterruptible(TaskBatch)}, this method
+   * does not block waiting for task completion and does not return results or 
futures. It is useful
+   * for scenarios where you want to initiate background processing but don't 
need to wait for or
+   * collect the results.
+   * <p>
+   * Tasks are submitted to the underlying thread pool and will execute 
according to the pool's
+   * scheduling policy. If any task fails during execution, the failure will 
be handled internally
+   * and will not propagate back to the caller since no results are collected.
+   * @param <R>   the type of result that would be returned by the tasks 
(unused since no results
+   *              are collected)
+   * @param tasks the batch of tasks to submit for asynchronous execution
+   * @throws ExecutionException if there is an error submitting the tasks to 
the thread pool
+   */
+  <R> void submitOnly(TaskBatch<R> tasks) throws ExecutionException;
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/write/AbstractParallelWriterIndexCommitter.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/write/AbstractParallelWriterIndexCommitter.java
index a090a1b59a..c88712f259 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/write/AbstractParallelWriterIndexCommitter.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/write/AbstractParallelWriterIndexCommitter.java
@@ -63,7 +63,6 @@ public abstract class AbstractParallelWriterIndexCommitter 
implements IndexCommi
   protected QuickFailingTaskRunner pool;
   protected KeyValueBuilder kvBuilder;
   protected RegionCoprocessorEnvironment env;
-  protected TaskBatch<Void> tasks;
   protected boolean disableIndexOnFailure = false;
 
   // This relies on Hadoop Configuration to handle warning about deprecated 
configs and
@@ -116,6 +115,39 @@ public abstract class AbstractParallelWriterIndexCommitter 
implements IndexCommi
   public void write(Multimap<HTableInterfaceReference, Mutation> toWrite,
     final boolean allowLocalUpdates, final int clientVersion)
     throws SingleIndexWriteFailureException {
+    TaskBatch<Void> tasks = new TaskBatch<>(toWrite.asMap().size());
+    addTasks(toWrite, allowLocalUpdates, clientVersion, tasks);
+    submitTasks(tasks);
+  }
+
+  /**
+   * Submits the provided task batch for execution. This method defines the 
task submission strategy
+   * and must be implemented by concrete subclasses to specify whether tasks 
should be executed
+   * synchronously (blocking until completion) or asynchronously 
(fire-and-forget).
+   * @param tasks the batch of index write tasks to submit for execution
+   * @throws SingleIndexWriteFailureException if there is an error during task 
submission or
+   *                                          execution 
(implementation-dependent)
+   */
+  protected abstract void submitTasks(TaskBatch<Void> tasks)
+    throws SingleIndexWriteFailureException;
+
+  /**
+   * Adds parallel index write tasks to the provided task batch for execution 
across multiple index
+   * tables. Each index table gets its own task that will be executed in 
parallel to optimize write
+   * performance.
+   * @param toWrite           a multimap containing index table references as 
keys and their
+   *                          corresponding mutations as values. Each table 
will get its own
+   *                          parallel task.
+   * @param allowLocalUpdates if false, skips creating tasks for writes to the 
same table as the
+   *                          current region to prevent potential deadlocks
+   * @param clientVersion     the Phoenix client version, used for 
compatibility checks and
+   *                          version-specific behavior in the index write 
operations
+   * @param tasks             the task batch to which the newly created index 
write tasks will be
+   *                          added. This batch needs to be submitted for 
parallel execution by the
+   *                          caller.
+   */
+  private void addTasks(Multimap<HTableInterfaceReference, Mutation> toWrite,
+    boolean allowLocalUpdates, int clientVersion, TaskBatch<Void> tasks) {
     /*
      * This bit here is a little odd, so let's explain what's going on. 
Basically, we want to do the
      * writes in parallel to each index table, so each table gets its own task 
and is submitted to
@@ -128,7 +160,6 @@ public abstract class AbstractParallelWriterIndexCommitter 
implements IndexCommi
      */
 
     Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = 
toWrite.asMap().entrySet();
-    tasks = new TaskBatch<Void>(entries.size());
     for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : 
entries) {
       // get the mutations for each table. We leak the implementation here a 
little bit to save
       // doing a complete copy over of all the index update for each table.
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/write/LazyParallelWriterIndexCommitter.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/write/LazyParallelWriterIndexCommitter.java
index ddbf9f003e..9ac746dab3 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/write/LazyParallelWriterIndexCommitter.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/write/LazyParallelWriterIndexCommitter.java
@@ -17,17 +17,31 @@
  */
 package org.apache.phoenix.hbase.index.write;
 
+import 
org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException;
+import org.apache.phoenix.hbase.index.parallel.TaskBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Like the {@link ParallelWriterIndexCommitter}, but does not block
  */
 public class LazyParallelWriterIndexCommitter extends 
AbstractParallelWriterIndexCommitter {
 
-  // for testing
-  public LazyParallelWriterIndexCommitter(String hbaseVersion) {
-    super(hbaseVersion);
-  }
+  private static final Logger LOGGER =
+    LoggerFactory.getLogger(LazyParallelWriterIndexCommitter.class);
 
   public LazyParallelWriterIndexCommitter() {
     super();
   }
+
+  @Override
+  protected void submitTasks(TaskBatch<Void> tasks) throws 
SingleIndexWriteFailureException {
+    try {
+      pool.submitOnly(tasks);
+    } catch (Exception e) {
+      LOGGER.error("Error while submitting the task.", e);
+      propagateFailure(e.getCause());
+    }
+  }
+
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
index cbffbc6579..18a25ae020 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
@@ -18,15 +18,12 @@
 package org.apache.phoenix.hbase.index.write;
 
 import java.util.concurrent.ExecutionException;
-import org.apache.hadoop.hbase.client.Mutation;
 import 
org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException;
 import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
-import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.hbase.index.parallel.TaskBatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.phoenix.thirdparty.com.google.common.collect.Multimap;
-
 /**
  * Write index updates to the index tables in parallel. We attempt to early 
exit from the writes if
  * any of the index updates fails. Completion is determined by the following 
criteria: *
@@ -49,12 +46,7 @@ public class ParallelWriterIndexCommitter extends 
AbstractParallelWriterIndexCom
   }
 
   @Override
-  public void write(Multimap<HTableInterfaceReference, Mutation> toWrite,
-    final boolean allowLocalUpdates, final int clientVersion)
-    throws SingleIndexWriteFailureException {
-
-    super.write(toWrite, allowLocalUpdates, clientVersion);
-    // actually submit the tasks to the pool and wait for them to finish/fail
+  protected void submitTasks(TaskBatch<Void> tasks) throws 
SingleIndexWriteFailureException {
     try {
       pool.submitUninterruptible(tasks);
     } catch (EarlyExitFailure e) {
@@ -63,6 +55,5 @@ public class ParallelWriterIndexCommitter extends 
AbstractParallelWriterIndexCom
       LOGGER.error("Found a failed index update!");
       propagateFailure(e.getCause());
     }
-
   }
 }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson4IT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson4IT.java
index 8d11748ead..103f0e6df5 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson4IT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson4IT.java
@@ -44,6 +44,9 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.ExplainPlanAttributes;
 import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.hbase.index.metrics.GlobalIndexCheckerSource;
+import org.apache.phoenix.hbase.index.metrics.GlobalIndexCheckerSourceImpl;
+import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
 import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.schema.types.PDouble;
@@ -68,7 +71,7 @@ import 
org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
 /**
  * Tests for BSON.
  */
-@Category(ParallelStatsDisabledTest.class)
+@Category(NeedsOwnMiniClusterTest.class)
 @RunWith(Parameterized.class)
 public class Bson4IT extends ParallelStatsDisabledIT {
 
@@ -291,7 +294,8 @@ public class Bson4IT extends ParallelStatsDisabledIT {
     try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
 
       String ddl = "CREATE TABLE " + tableName + " (PK1 VARCHAR NOT NULL, C1 
VARCHAR, COL BSON"
-        + " CONSTRAINT pk PRIMARY KEY(PK1))";
+        + " CONSTRAINT pk PRIMARY KEY(PK1))"
+        + " \"org.apache.hadoop.hbase.index.lazy.post_batch.write\"=true";
 
       final String indexDdl1;
       if (!this.coveredIndex) {
@@ -367,6 +371,7 @@ public class Bson4IT extends ParallelStatsDisabledIT {
       stmt.executeUpdate();
 
       conn.commit();
+      Thread.sleep(500);
 
       ResultSet rs = conn.createStatement().executeQuery("SELECT count(*) FROM 
" + tableName);
       assertTrue(rs.next());
@@ -450,9 +455,25 @@ public class Bson4IT extends ParallelStatsDisabledIT {
       assertFalse(rs.next());
 
       validateExplainPlan(ps, tableName, "FULL SCAN ");
+      verifyNoReadRepair();
     }
   }
 
+  private void verifyNoReadRepair() {
+    GlobalIndexCheckerSourceImpl indexCheckerSource =
+      (GlobalIndexCheckerSourceImpl) MetricsIndexerSourceFactory.getInstance()
+        .getGlobalIndexCheckerSource();
+
+    long indexRepairs = indexCheckerSource.getMetricsRegistry()
+      .getCounter(GlobalIndexCheckerSource.INDEX_REPAIR, 0).value();
+    assertEquals("No index repairs should occur during test execution", 0, 
indexRepairs);
+
+    long indexRepairFailures = indexCheckerSource.getMetricsRegistry()
+      .getCounter(GlobalIndexCheckerSource.INDEX_REPAIR_FAILURE, 0).value();
+    assertEquals("No index repair failures should occur during test 
execution", 0,
+      indexRepairFailures);
+  }
+
   @Test
   public void testBsonValueFunctionWithBSONType() throws Exception {
     Assume.assumeTrue(this.coveredIndex && this.columnEncoded); // Since 
indexing on BSON not
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
index 69b81826d5..03ca9e1a68 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
@@ -68,7 +68,7 @@ public class ConcurrentMutationsExtendedIT extends 
ParallelStatsDisabledIT {
   private static final String MVCC_LOCK_TEST_TABLE_PREFIX = "MVCCLOCKTEST_";
   private static final String LOCK_TEST_TABLE_PREFIX = "LOCKTEST_";
   private static final int ROW_LOCK_WAIT_TIME = 10000;
-  private static final int MAX_LOOKBACK_AGE = 1000000;
+  protected static final int MAX_LOOKBACK_AGE = 1000000;
   private final Object lock = new Object();
 
   public ConcurrentMutationsExtendedIT(boolean uncovered) {
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java
new file mode 100644
index 0000000000..761c8658eb
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import java.util.Map;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+
+/**
+ * Test class that extends ConcurrentMutationsExtendedIT with lazy post batch 
write enabled.
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class ConcurrentMutationsLazyPostBatchWriteIT extends 
ConcurrentMutationsExtendedIT {
+
+  public ConcurrentMutationsLazyPostBatchWriteIT(boolean uncovered) {
+    super(uncovered);
+    Assume.assumeFalse("Only covered index supports lazy post batch write 
mode", uncovered);
+  }
+
+  @BeforeClass
+  public static synchronized void doSetup() throws Exception {
+    Map<String, String> props = Maps.newHashMapWithExpectedSize(4);
+    
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, 
Long.toString(0));
+    props.put(IndexRegionObserver.INDEX_LAZY_POST_BATCH_WRITE, "true");
+    
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+      Integer.toString(MAX_LOOKBACK_AGE));
+    props.put("hbase.rowlock.wait.duration", "100");
+    props.put("phoenix.index.concurrent.wait.duration.ms", "10");
+    setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+  }
+}

Reply via email to