This is an automated email from the ASF dual-hosted git repository.

palashc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new f4e88ed6f8 PHOENIX-7884 : Refactor tracking of IndexCdcConsumer lag 
(#2506)
f4e88ed6f8 is described below

commit f4e88ed6f840c7e27b88adcbf75b7cd80a070336
Author: Palash Chauhan <[email protected]>
AuthorDate: Tue Jun 23 12:35:52 2026 -0700

    PHOENIX-7884 : Refactor tracking of IndexCdcConsumer lag (#2506)
---
 .../metrics/MetricsIndexCDCConsumerSource.java     |   6 +-
 .../phoenix/hbase/index/IndexCDCConsumer.java      | 117 ++++++++++++-----
 .../hbase/index/IndexCDCConsumerProgress.java      |  92 +++++++++++++
 phoenix-core/pom.xml                               |   5 +
 .../phoenix/end2end/IndexCDCConsumerLagIT.java     | 146 +++++++++++++++++++++
 .../hbase/index/IndexCDCConsumerProgressTest.java  | 121 +++++++++++++++++
 6 files changed, 455 insertions(+), 32 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java
index c278c1fa4e..cb39b5b38a 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java
@@ -52,7 +52,11 @@ public interface MetricsIndexCDCConsumerSource extends 
BaseSource {
 
   String CDC_INDEX_UPDATE_LAG = "cdcIndexUpdateLag";
   String CDC_INDEX_UPDATE_LAG_DESC =
-    "Histogram for the lag in milliseconds between current time and the last 
processed CDC event";
+    "Histogram of current time minus the consumer's effective freshness 
watermark, in "
+      + "milliseconds. The watermark advances on successful own-partition 
batches AND on empty "
+      + "polls (which prove caught-up to queryStart - timestampBufferMs). Idle 
steady state is "
+      + "≈ timestampBufferMs; grows during sustained failure, parent-region 
replay, or cold "
+      + "start (where it is floored at now - consumerStartTime).";
 
   /**
    * Updates the CDC batch processing time histogram.
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
index 6c4ceab789..412e380b66 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
@@ -112,7 +112,7 @@ public class IndexCDCConsumer implements Runnable {
    */
   public static final String INDEX_CDC_CONSUMER_POLL_INTERVAL_MS =
     "phoenix.index.cdc.consumer.poll.interval.ms";
-  private static final long DEFAULT_POLL_INTERVAL_MS = 1000;
+  private static final long DEFAULT_POLL_INTERVAL_MS = 500;
 
   /**
    * The time buffer in milliseconds subtracted from current time when 
querying CDC mutations to
@@ -130,16 +130,25 @@ public class IndexCDCConsumer implements Runnable {
    */
   public static final String INDEX_CDC_CONSUMER_MAX_DATA_VISIBILITY_RETRIES =
     "phoenix.index.cdc.consumer.max.data.visibility.retries";
-  private static final int DEFAULT_MAX_DATA_VISIBILITY_RETRIES = 10;
+  private static final int DEFAULT_MAX_DATA_VISIBILITY_RETRIES = 15;
 
   public static final String INDEX_CDC_CONSUMER_RETRY_PAUSE_MS =
     "phoenix.index.cdc.consumer.retry.pause.ms";
-  private static final long DEFAULT_RETRY_PAUSE_MS = 2000;
+  private static final long DEFAULT_RETRY_PAUSE_MS = 200;
 
   public static final String INDEX_CDC_CONSUMER_PARENT_PROGRESS_PAUSE_MS =
     "phoenix.index.cdc.consumer.parent.progress.pause.ms";
   private static final long DEFAULT_PARENT_PROGRESS_PAUSE_MS = 15000;
 
+  /**
+   * Interval between {@code cdcIndexUpdateLag} samples emitted while the 
consumer is sleeping (idle
+   * poll, backoff, parent-progress wait, etc.). Clamped to at least 50ms.
+   */
+  public static final String INDEX_CDC_CONSUMER_LAG_SAMPLE_INTERVAL_MS =
+    "phoenix.index.cdc.consumer.lag.sample.interval.ms";
+  private static final long DEFAULT_LAG_SAMPLE_INTERVAL_MS = 1000L;
+  private static final long MIN_LAG_SAMPLE_INTERVAL_MS = 50L;
+
   private final RegionCoprocessorEnvironment env;
   private final String dataTableName;
   private final String encodedRegionName;
@@ -154,6 +163,12 @@ public class IndexCDCConsumer implements Runnable {
   private final Configuration config;
   private final boolean serializeCDCMutations;
   private final MetricsIndexCDCConsumerSource metricSource;
+  private final long lagSampleIntervalMs;
+  private final IndexCDCConsumerProgress progress;
+  // Flipped true once hasEventuallyConsistentIndexes() confirms this region 
actually has an EC
+  // index. Until then sleepWithLagSampling does not emit, so tables that 
immediately exit "no EC
+  // index" produce no cold-start lag samples into the global / per-table 
histograms.
+  private volatile boolean lagEmissionEnabled = false;
   private volatile boolean stopped = false;
   private Thread consumerThread;
   private boolean hasParentPartitions = false;
@@ -228,7 +243,11 @@ public class IndexCDCConsumer implements Runnable {
       DEFAULT_MAX_DATA_VISIBILITY_RETRIES);
     this.parentProgressPauseMs =
       config.getLong(INDEX_CDC_CONSUMER_PARENT_PROGRESS_PAUSE_MS, 
DEFAULT_PARENT_PROGRESS_PAUSE_MS);
+    this.lagSampleIntervalMs = Math.max(MIN_LAG_SAMPLE_INTERVAL_MS,
+      config.getLong(INDEX_CDC_CONSUMER_LAG_SAMPLE_INTERVAL_MS, 
DEFAULT_LAG_SAMPLE_INTERVAL_MS));
     this.metricSource = 
MetricsIndexerSourceFactory.getInstance().getIndexCDCConsumerSource();
+    this.progress = new 
IndexCDCConsumerProgress(EnvironmentEdgeManager.currentTimeMillis(),
+      this.timestampBufferMs);
     DelegateRegionCoprocessorEnvironment indexWriterEnv =
       new DelegateRegionCoprocessorEnvironment(env, 
ConnectionType.INDEX_WRITER_CONNECTION);
     this.indexWriter =
@@ -259,13 +278,23 @@ public class IndexCDCConsumer implements Runnable {
   }
 
   /**
-   * Sleeps for the specified duration if the consumer has not been stopped.
-   * @param millis the duration to sleep in milliseconds.
-   * @throws InterruptedException if the thread is interrupted while sleeping.
+   * Sleeps for up to {@code totalMillis}, emitting a {@code 
cdcIndexUpdateLag} sample at the start
+   * of each {@code lagSampleIntervalMs} slice once {@link 
#lagEmissionEnabled} is set. Aborts
+   * immediately when stopped. Used for all consumer-thread sleeps so the lag 
metric stays
+   * non-silent across idle, failure, post-startup, and parent-replay phases.
    */
-  private void sleepIfNotStopped(long millis) throws InterruptedException {
-    if (!stopped) {
-      Thread.sleep(millis);
+  private void sleepWithLagSampling(long totalMillis) throws 
InterruptedException {
+    long deadline = EnvironmentEdgeManager.currentTimeMillis() + totalMillis;
+    while (!stopped) {
+      long now = EnvironmentEdgeManager.currentTimeMillis();
+      if (lagEmissionEnabled) {
+        metricSource.updateCdcLag(dataTableName, progress.currentLagMs(now));
+      }
+      long remaining = deadline - now;
+      if (remaining <= 0) {
+        return;
+      }
+      Thread.sleep(Math.min(remaining, lagSampleIntervalMs));
     }
   }
 
@@ -374,7 +403,7 @@ public class IndexCDCConsumer implements Runnable {
           "Error while retrieving partition keys from CDC_STREAM for partition 
{} table {}. "
             + "Retry #{}, sleeping {} ms before retrying...",
           partitionId, dataTableName, retryCount, sleepTime, e);
-        sleepIfNotStopped(sleepTime);
+        sleepWithLagSampling(sleepTime);
       }
     }
     return null;
@@ -405,7 +434,7 @@ public class IndexCDCConsumer implements Runnable {
   public void run() {
     try {
       if (startupDelayMs > 0 && getCDCStreamNumPartitions() <= 1) {
-        sleepIfNotStopped(startupDelayMs);
+        sleepWithLagSampling(startupDelayMs);
       }
       if (stopped) {
         return;
@@ -415,6 +444,9 @@ public class IndexCDCConsumer implements Runnable {
           dataTableName);
         return;
       }
+      // Only enable lag sampling once we've confirmed this table actually has 
an EC index,
+      // so non-EC-indexed tables don't pollute the lag histograms with 
cold-start samples.
+      lagEmissionEnabled = true;
       LOG.info(
         "IndexCDCConsumer started for table {} region {}"
           + " [batchSize: {}, pollIntervalMs: {}, timestampBufferMs: {}, 
startupDelayMs: {},"
@@ -438,13 +470,14 @@ public class IndexCDCConsumer implements Runnable {
           dataTableName, encodedRegionName);
         return;
       } else if (lastProcessedTimestamp > 0) {
+        progress.recordProcessed(lastProcessedTimestamp);
         LOG.info(
           "Found existing tracker entry for table {} region {} with 
lastTimestamp {}. "
             + "Resuming from last position (region movement scenario).",
           dataTableName, encodedRegionName, lastProcessedTimestamp);
       } else {
         if (hasParentPartitions) {
-          sleepIfNotStopped(timestampBufferMs + 1);
+          sleepWithLagSampling(timestampBufferMs + 1);
           replayAndCompleteParentRegions(encodedRegionName);
         } else {
           LOG.info("No parent partitions for table {} region {}, skipping 
parent replay",
@@ -463,10 +496,10 @@ public class IndexCDCConsumer implements Runnable {
               lastProcessedTimestamp, false);
           }
           if (lastProcessedTimestamp == previousTimestamp) {
-            sleepIfNotStopped(ConnectionUtils.getPauseTime(pause, 
++retryCount));
+            sleepWithLagSampling(ConnectionUtils.getPauseTime(pause, 
++retryCount));
           } else {
             retryCount = 0;
-            sleepIfNotStopped(pollIntervalMs);
+            sleepWithLagSampling(pollIntervalMs);
           }
         } catch (Exception e) {
           if (e instanceof InterruptedException) {
@@ -478,7 +511,7 @@ public class IndexCDCConsumer implements Runnable {
             "Error processing CDC mutations for table {} region {}. "
               + "Retry #{}, sleeping {} ms before retrying...",
             dataTableName, encodedRegionName, retryCount, sleepTime, e);
-          sleepIfNotStopped(sleepTime);
+          sleepWithLagSampling(sleepTime);
         }
       }
     } catch (InterruptedException e) {
@@ -518,7 +551,7 @@ public class IndexCDCConsumer implements Runnable {
           "Error checking for eventually consistent indexes for table {}. "
             + "Retry #{}, sleeping {} ms before retrying...",
           dataTableName, retryCount, sleepTime, e);
-        sleepIfNotStopped(sleepTime);
+        sleepWithLagSampling(sleepTime);
       }
     }
     return false;
@@ -559,7 +592,7 @@ public class IndexCDCConsumer implements Runnable {
           "Error getting CDC_STREAM row count for table {}. "
             + "Retry #{}, sleeping {} ms before retrying...",
           dataTableName, retryCount, sleepTime, e);
-        sleepIfNotStopped(sleepTime);
+        sleepWithLagSampling(sleepTime);
       }
     }
     return -1;
@@ -597,14 +630,14 @@ public class IndexCDCConsumer implements Runnable {
           "CDC_STREAM entry not found for table {} partition {}. "
             + "Attempt #{}, sleeping {} ms before retrying...",
           dataTableName, encodedRegionName, retryCount, sleepTime);
-        sleepIfNotStopped(sleepTime);
+        sleepWithLagSampling(sleepTime);
       } catch (SQLException e) {
         long sleepTime = ConnectionUtils.getPauseTime(pause, ++retryCount);
         LOG.warn(
           "Error checking CDC_STREAM for table {} partition {}. "
             + "Retry #{}, sleeping {} ms before retrying...",
           dataTableName, encodedRegionName, retryCount, sleepTime, e);
-        sleepIfNotStopped(sleepTime);
+        sleepWithLagSampling(sleepTime);
       }
     }
     return false;
@@ -662,7 +695,7 @@ public class IndexCDCConsumer implements Runnable {
           "Error checking IDX_CDC_TRACKER for table {} partition {} owner {}. "
             + "Retry #{}, sleeping {} ms before retrying...",
           dataTableName, partitionId, ownerPartitionId, retryCount, sleepTime, 
e);
-        sleepIfNotStopped(sleepTime);
+        sleepWithLagSampling(sleepTime);
       }
     }
     return 0;
@@ -695,7 +728,7 @@ public class IndexCDCConsumer implements Runnable {
           "Error checking if partition {} is completed for table {}. "
             + "Retry #{}, sleeping {} ms before retrying...",
           partitionId, dataTableName, retryCount, sleepTime, e);
-        sleepIfNotStopped(sleepTime);
+        sleepWithLagSampling(sleepTime);
       }
     }
     return false;
@@ -739,7 +772,7 @@ public class IndexCDCConsumer implements Runnable {
           "Error getting parent progress for partition {} table {}. "
             + "Retry #{}, sleeping {} ms before retrying...",
           partitionId, dataTableName, retryCount, sleepTime, e);
-        sleepIfNotStopped(sleepTime);
+        sleepWithLagSampling(sleepTime);
       }
     }
     throw new InterruptedException("IndexCDCConsumer stopped while getting 
parent progress.");
@@ -778,7 +811,7 @@ public class IndexCDCConsumer implements Runnable {
           "Error querying parent partitions from CDC_STREAM for table {} 
partition {}. "
             + "Retry #{}, sleeping {} ms before retrying...",
           dataTableName, partitionId, retryCount, sleepTime, e);
-        sleepIfNotStopped(sleepTime);
+        sleepWithLagSampling(sleepTime);
       }
     }
     return Collections.emptyList();
@@ -812,7 +845,7 @@ public class IndexCDCConsumer implements Runnable {
             long previousOtherProgress;
             do {
               previousOtherProgress = otherProgress;
-              sleepIfNotStopped(parentProgressPauseMs);
+              sleepWithLagSampling(parentProgressPauseMs);
               if (isPartitionCompleted(partitionId)) {
                 return;
               }
@@ -856,7 +889,7 @@ public class IndexCDCConsumer implements Runnable {
             + "lastProcessedTimestamp {}. Retry #{}, sleeping {} ms",
           partitionId, ownerPartitionId, dataTableName, 
currentLastProcessedTimestamp, retryCount,
           sleepTime, e);
-        sleepIfNotStopped(sleepTime);
+        sleepWithLagSampling(sleepTime);
       }
     }
     LOG.info("Processing partition {} (owner {}) stopped before completion for 
table {}",
@@ -943,7 +976,11 @@ public class IndexCDCConsumer implements Runnable {
       long newLastTimestamp = lastProcessedTimestamp;
       boolean hasMoreRows = true;
       int retryCount = 0;
+      // Captured immediately before each query so the empty-poll watermark 
cannot over-advance
+      // past what the query's own (now - timestampBufferMs) upper bound 
actually proved empty.
+      long lastQueryStartTime = newLastTimestamp;
       while (hasMoreRows && batchMutations.isEmpty()) {
+        lastQueryStartTime = EnvironmentEdgeManager.currentTimeMillis();
         try (PreparedStatement ps = conn.prepareStatement(cdcQuery)) {
           setStatementParams(scanInfo, partitionId, isParentReplay, 
newLastTimestamp, ps);
           Pair<Long, Boolean> result =
@@ -952,11 +989,15 @@ public class IndexCDCConsumer implements Runnable {
           if (hasMoreRows) {
             newLastTimestamp = result.getFirst();
             if (batchMutations.isEmpty()) {
-              sleepIfNotStopped(ConnectionUtils.getPauseTime(pause, 
++retryCount));
+              sleepWithLagSampling(ConnectionUtils.getPauseTime(pause, 
++retryCount));
             }
           }
         }
       }
+      // Empty own-partition poll proves we are caught up to (queryStart - 
timestampBufferMs).
+      if (!hasMoreRows && !isParentReplay) {
+        progress.recordEmptyPoll(lastQueryStartTime);
+      }
       // With predefined LIMIT, there might be more rows with the same 
timestamp that were not
       // included in this batch.
       if (newLastTimestamp > lastProcessedTimestamp) {
@@ -985,8 +1026,9 @@ public class IndexCDCConsumer implements Runnable {
         metricSource.updateCdcBatchProcessTime(dataTableName,
           EnvironmentEdgeManager.currentTimeMillis() - batchStartTime);
         metricSource.incrementCdcBatchCount(dataTableName);
-        metricSource.updateCdcLag(dataTableName,
-          EnvironmentEdgeManager.currentTimeMillis() - newLastTimestamp);
+        if (!isParentReplay) {
+          progress.recordProcessed(newLastTimestamp);
+        }
         updateTrackerProgress(conn, partitionId, ownerPartitionId, 
newLastTimestamp,
           PhoenixDatabaseMetaData.TRACKER_STATUS_IN_PROGRESS);
       }
@@ -1052,7 +1094,11 @@ public class IndexCDCConsumer implements Runnable {
       long[] lastScannedTimestamp = { lastProcessedTimestamp };
       boolean hasMoreRows = true;
       int retryCount = 0;
+      // Captured immediately before each query so the empty-poll watermark 
cannot over-advance
+      // past what the query's own (now - timestampBufferMs) upper bound 
actually proved empty.
+      long lastQueryStartTime = newLastTimestamp;
       while (hasMoreRows && batchStates.isEmpty()) {
+        lastQueryStartTime = EnvironmentEdgeManager.currentTimeMillis();
         try (PreparedStatement ps = conn.prepareStatement(cdcQuery)) {
           setStatementParams(scanInfo, partitionId, isParentReplay, 
newLastTimestamp, ps);
           Pair<Long, Boolean> result =
@@ -1067,16 +1113,24 @@ public class IndexCDCConsumer implements Runnable {
                   + " to {} after {} retries — data table mutations may have 
failed",
                 dataTableName, partitionId, newLastTimestamp, 
lastScannedTimestamp[0], retryCount);
               newLastTimestamp = lastScannedTimestamp[0];
+              // NOTE: durable tracker advances below (newLastTimestamp > 
lastProcessedTimestamp)
+              // but progress.recordProcessed is skipped 
(batchStates.isEmpty()). The in-memory
+              // watermark lags durable state until the next empty poll heals 
it — over-reports
+              // lag temporarily (safe direction, self-healing).
               break;
             } else {
               // CDC index entries are written but the data is not yet visible.
               // Don't advance newLastTimestamp so the same events are 
re-fetched
               // once the data becomes visible.
-              sleepIfNotStopped(ConnectionUtils.getPauseTime(pause, 
++retryCount));
+              sleepWithLagSampling(ConnectionUtils.getPauseTime(pause, 
++retryCount));
             }
           }
         }
       }
+      // Empty own-partition poll proves we are caught up to (queryStart - 
timestampBufferMs).
+      if (!hasMoreRows && !isParentReplay) {
+        progress.recordEmptyPoll(lastQueryStartTime);
+      }
       if (newLastTimestamp > lastProcessedTimestamp) {
         String sameTimestampQuery = String.format(
           "SELECT /*+ CDC_INCLUDE(DATA_ROW_STATE) */ PHOENIX_ROW_TIMESTAMP(), 
\"CDC JSON\" "
@@ -1106,8 +1160,9 @@ public class IndexCDCConsumer implements Runnable {
         metricSource.updateCdcBatchProcessTime(dataTableName,
           EnvironmentEdgeManager.currentTimeMillis() - batchStartTime);
         metricSource.incrementCdcBatchCount(dataTableName);
-        metricSource.updateCdcLag(dataTableName,
-          EnvironmentEdgeManager.currentTimeMillis() - newLastTimestamp);
+        if (!isParentReplay) {
+          progress.recordProcessed(newLastTimestamp);
+        }
       }
       if (newLastTimestamp > lastProcessedTimestamp) {
         updateTrackerProgress(conn, partitionId, ownerPartitionId, 
newLastTimestamp,
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumerProgress.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumerProgress.java
new file mode 100644
index 0000000000..9c6e452a9d
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumerProgress.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index;
+
+/**
+ * Observable progress of an {@link IndexCDCConsumer}.
+ * <p>
+ * Thread-safety: <b>single-writer.</b> The owning consumer thread is the only 
writer of
+ * {@link #recordProcessed} / {@link #recordEmptyPoll}; the same thread also 
reads via
+ * {@link #currentLagMs}. {@code volatile} fields provide safe publication for 
an off-thread
+ * observer (e.g. JMX scraper) that may read {@link #getEffectiveWatermark} or
+ * {@link #getLastEmptyPollEndTime}, but no off-thread <i>writer</i> is 
supported. Watermark is
+ * monotonic; lag is clamped to zero on clock regressions.
+ */
+final class IndexCDCConsumerProgress {
+
+  // Wall-clock time at consumer construction; floor for currentLagMs before 
any signal.
+  private final long consumerStartTime;
+  // Read-back buffer the consumer subtracts from `now` on its own-partition 
CDC filter.
+  private final long timestampBufferMs;
+
+  // Latest CDC event timestamp this consumer has acknowledged for its own 
partition.
+  private volatile long lastProcessedTimestamp = 0L;
+  // Wall-clock time captured immediately before the most recent own-partition 
CDC poll that
+  // returned zero rows. Equals the upper bound of the poll's "no events exist 
below" proof:
+  // (lastEmptyPollEndTime - timestampBufferMs) is the latest CDC ts the 
consumer is caught up to.
+  private volatile long lastEmptyPollEndTime = 0L;
+  // Highest own-partition CDC timestamp the consumer is confirmed caught up 
to (monotonic).
+  private volatile long effectiveWatermark = 0L;
+
+  IndexCDCConsumerProgress(long consumerStartTime, long timestampBufferMs) {
+    this.consumerStartTime = consumerStartTime;
+    this.timestampBufferMs = timestampBufferMs;
+  }
+
+  /**
+   * Record progress from a successful own-partition batch. Single-writer 
(consumer thread only).
+   */
+  void recordProcessed(long ts) {
+    if (ts > lastProcessedTimestamp) {
+      lastProcessedTimestamp = ts;
+    }
+    advanceWatermark();
+  }
+
+  /** Record an own-partition CDC poll that returned zero rows. Single-writer. 
*/
+  void recordEmptyPoll(long queryStartWallClock) {
+    if (queryStartWallClock > lastEmptyPollEndTime) {
+      lastEmptyPollEndTime = queryStartWallClock;
+    }
+    advanceWatermark();
+  }
+
+  /** Current lag in milliseconds. Floors at {@code now - consumerStartTime} 
before any signal. */
+  long currentLagMs(long now) {
+    long base = effectiveWatermark > 0 ? effectiveWatermark : 
consumerStartTime;
+    long lag = now - base;
+    return lag < 0 ? 0 : lag;
+  }
+
+  private void advanceWatermark() {
+    long emptyPollWatermark =
+      lastEmptyPollEndTime > 0 ? lastEmptyPollEndTime - timestampBufferMs : 0L;
+    long candidate = Math.max(lastProcessedTimestamp, emptyPollWatermark);
+    if (candidate > effectiveWatermark) {
+      effectiveWatermark = candidate;
+    }
+  }
+
+  long getEffectiveWatermark() {
+    return effectiveWatermark;
+  }
+
+  long getLastEmptyPollEndTime() {
+    return lastEmptyPollEndTime;
+  }
+}
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index 6067ef4eb7..739f738a55 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -86,6 +86,11 @@
       <artifactId>hbase-server</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-hadoop-compat</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-metrics-api</artifactId>
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexCDCConsumerLagIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexCDCConsumerLagIT.java
new file mode 100644
index 0000000000..158c54d276
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexCDCConsumerLagIT.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_LAG_SAMPLE_INTERVAL_MS;
+import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_POLL_INTERVAL_MS;
+import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS;
+import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
+import static 
org.apache.phoenix.hbase.index.metrics.MetricsIndexCDCConsumerSource.CDC_INDEX_UPDATE_LAG;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.hadoop.metrics2.MetricHistogram;
+import org.apache.phoenix.coprocessor.PhoenixMasterObserver;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import 
org.apache.phoenix.hbase.index.metrics.MetricsIndexCDCConsumerSourceImpl;
+import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+
+/**
+ * Verifies the {@code cdcIndexUpdateLag} histogram keeps receiving samples 
while the consumer is
+ * idle.
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class IndexCDCConsumerLagIT extends ParallelStatsDisabledIT {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(IndexCDCConsumerLagIT.class);
+
+  private static final int TIMESTAMP_BUFFER_MS = 2_000;
+  private static final int POLL_INTERVAL_MS = 500;
+  private static final int LAG_SAMPLE_INTERVAL_MS = 500;
+  // Small retry pause so empty-poll backoff doesn't dominate idle behavior.
+  private static final int RETRY_PAUSE_MS = 100;
+  // Budget for the consumer to start up and emit its first lag sample. 
Generous because the
+  // consumer waits up to INDEX_CDC_CONSUMER_STARTUP_DELAY_MS (default 10s) 
and then performs
+  // CDC_STREAM / IDX_CDC_TRACKER lookups before its first poll. Sized for 
slow CI / cold JVM.
+  private static final long CONSUMER_STARTUP_BUDGET_MS = 120_000L;
+  // Idle window for the flow check. Only need to prove ≥ 1 sample fires; kept 
short to keep
+  // total test runtime low.
+  private static final long IDLE_WAIT_MS = 5_000L;
+  private static final long MAX_LOOKBACK_AGE = 1_000_000L;
+
+  @BeforeClass
+  public static synchronized void doSetup() throws Exception {
+    Map<String, String> props = Maps.newHashMapWithExpectedSize(10);
+    
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, 
Long.toString(0));
+    
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+      Long.toString(MAX_LOOKBACK_AGE));
+    props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, 
Long.toString(2));
+    props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, 
Long.toString(1));
+    props.put(QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB, 
Boolean.TRUE.toString());
+    props.put("hbase.coprocessor.master.classes", 
PhoenixMasterObserver.class.getName());
+    props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, 
Integer.toString(TIMESTAMP_BUFFER_MS));
+    props.put(INDEX_CDC_CONSUMER_POLL_INTERVAL_MS, 
Integer.toString(POLL_INTERVAL_MS));
+    props.put(INDEX_CDC_CONSUMER_LAG_SAMPLE_INTERVAL_MS, 
Integer.toString(LAG_SAMPLE_INTERVAL_MS));
+    props.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, 
Integer.toString(RETRY_PAUSE_MS));
+    setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+  }
+
+  private Connection getConnection() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+    return DriverManager.getConnection(getUrl(), props);
+  }
+
+  private MetricHistogram lagHistogram() {
+    MetricsIndexCDCConsumerSourceImpl source =
+      (MetricsIndexCDCConsumerSourceImpl) 
MetricsIndexerSourceFactory.getInstance()
+        .getIndexCDCConsumerSource();
+    return source.getMetricsRegistry().getHistogram(CDC_INDEX_UPDATE_LAG);
+  }
+
+  /**
+   * Polls until the lag histogram has at least {@code minCount} samples, or 
fails after timeout.
+   */
+  private void awaitMinCount(long minCount, long timeoutMs) throws 
InterruptedException {
+    long deadline = System.currentTimeMillis() + timeoutMs;
+    long observed = 0L;
+    while (System.currentTimeMillis() < deadline) {
+      observed = lagHistogram().getCount();
+      if (observed >= minCount) {
+        return;
+      }
+      Thread.sleep(500L);
+    }
+    fail("Lag histogram never reached count=" + minCount + " within " + 
timeoutMs + "ms; observed="
+      + observed);
+  }
+
+  @Test
+  public void testLagMetricKeepsSamplingWhenIdle() throws Exception {
+    String tableName = generateUniqueName();
+    String indexName = generateUniqueName();
+
+    try (Connection conn = getConnection()) {
+      conn.createStatement().execute("CREATE TABLE " + tableName
+        + " (PK VARCHAR NOT NULL PRIMARY KEY," + " V1 VARCHAR, V2 VARCHAR) 
COLUMN_ENCODED_BYTES=0");
+      conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + 
tableName
+        + "(V1) INCLUDE (V2) CONSISTENCY=EVENTUAL");
+      conn.createStatement()
+        .execute("UPSERT INTO " + tableName + " (PK, V1, V2) VALUES ('r1', 
'v1', 'd1')");
+      conn.commit();
+    }
+
+    // Wait for the consumer thread to start and emit its first lag sample. 
Replaces a fixed
+    // settle sleep so the test is robust to slow CI / cold JVM startup.
+    awaitMinCount(1L, CONSUMER_STARTUP_BUDGET_MS);
+
+    long countBeforeIdle = lagHistogram().getCount();
+    Thread.sleep(IDLE_WAIT_MS);
+    long countAfterIdle = lagHistogram().getCount();
+    long delta = countAfterIdle - countBeforeIdle;
+    LOG.info("Idle window {}ms: countBefore={}, countAfter={}, delta={}", 
IDLE_WAIT_MS,
+      countBeforeIdle, countAfterIdle, delta);
+
+    assertTrue("Histogram count did not advance during idle; delta=" + delta, 
delta >= 1);
+  }
+}
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/IndexCDCConsumerProgressTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/IndexCDCConsumerProgressTest.java
new file mode 100644
index 0000000000..3c87d8849b
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/IndexCDCConsumerProgressTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+public class IndexCDCConsumerProgressTest {
+
+  private static final long BUFFER_MS = 5_000L;
+  private static final long START_TIME = 1_000_000L;
+
+  private IndexCDCConsumerProgress newProgress() {
+    return new IndexCDCConsumerProgress(START_TIME, BUFFER_MS);
+  }
+
+  @Test
+  public void coldStartReportsLagSinceConsumerStart() {
+    IndexCDCConsumerProgress p = newProgress();
+    assertEquals(100L, p.currentLagMs(START_TIME + 100L));
+    assertEquals(0L, p.getEffectiveWatermark());
+  }
+
+  @Test
+  public void processedAdvancesWatermark() {
+    IndexCDCConsumerProgress p = newProgress();
+    long processed = START_TIME + 10_000L;
+    p.recordProcessed(processed);
+    assertEquals(processed, p.getEffectiveWatermark());
+    assertEquals(2_000L, p.currentLagMs(processed + 2_000L));
+  }
+
+  @Test
+  public void processedIsMonotonic() {
+    IndexCDCConsumerProgress p = newProgress();
+    p.recordProcessed(START_TIME + 1_000L);
+    p.recordProcessed(START_TIME + 500L);
+    assertEquals(START_TIME + 1_000L, p.getEffectiveWatermark());
+  }
+
+  @Test
+  public void emptyPollAdvancesWatermarkBuffersBelowPollTime() {
+    IndexCDCConsumerProgress p = newProgress();
+    long pollEnd = START_TIME + 20_000L;
+    p.recordEmptyPoll(pollEnd);
+    assertEquals(pollEnd - BUFFER_MS, p.getEffectiveWatermark());
+    // Lag at the same instant collapses to the buffer baseline.
+    assertEquals(BUFFER_MS, p.currentLagMs(pollEnd));
+  }
+
+  @Test
+  public void emptyPollIsMonotonic() {
+    IndexCDCConsumerProgress p = newProgress();
+    long firstPoll = START_TIME + 20_000L;
+    long earlierPoll = START_TIME + 10_000L;
+    p.recordEmptyPoll(firstPoll);
+    p.recordEmptyPoll(earlierPoll);
+    assertEquals(firstPoll, p.getLastEmptyPollEndTime());
+    assertEquals(firstPoll - BUFFER_MS, p.getEffectiveWatermark());
+  }
+
+  @Test
+  public void watermarkIsMaxOfProcessedAndEmptyPollFloor() {
+    IndexCDCConsumerProgress p = newProgress();
+    long processed = START_TIME + 50_000L;
+    long pollEnd = START_TIME + 30_000L;
+    p.recordProcessed(processed);
+    p.recordEmptyPoll(pollEnd);
+    // processed dominates because (pollEnd - BUFFER_MS) < processed
+    assertEquals(processed, p.getEffectiveWatermark());
+
+    // A later empty poll above (processed + BUFFER_MS) advances the watermark 
again.
+    long laterPoll = processed + BUFFER_MS + 1_000L;
+    p.recordEmptyPoll(laterPoll);
+    assertEquals(laterPoll - BUFFER_MS, p.getEffectiveWatermark());
+  }
+
+  @Test
+  public void idleAfterEmptyPollStaysBoundedByBufferPlusElapsed() {
+    IndexCDCConsumerProgress p = newProgress();
+    long pollEnd = START_TIME + 20_000L;
+    p.recordEmptyPoll(pollEnd);
+    long elapsed = 7_500L;
+    assertEquals(BUFFER_MS + elapsed, p.currentLagMs(pollEnd + elapsed));
+  }
+
+  @Test
+  public void negativeLagClampedToZero() {
+    IndexCDCConsumerProgress p = newProgress();
+    long processed = START_TIME + 50_000L;
+    p.recordProcessed(processed);
+    // clock went backwards relative to the watermark
+    assertEquals(0L, p.currentLagMs(processed - 100L));
+  }
+
+  @Test
+  public void emptyPollOlderThanBufferContributesNothing() {
+    IndexCDCConsumerProgress p = newProgress();
+    long pollEnd = BUFFER_MS - 1L;
+    p.recordEmptyPoll(pollEnd);
+    // pollEnd - BUFFER_MS would be negative; do not pollute the watermark.
+    assertEquals(pollEnd, p.getLastEmptyPollEndTime());
+    assertEquals(0L, p.getEffectiveWatermark());
+  }
+}


Reply via email to