Copilot commented on code in PR #2489:
URL: https://github.com/apache/phoenix/pull/2489#discussion_r3352450609


##########
phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java:
##########
@@ -83,7 +95,7 @@ public class ReplicationLogReplayService {
   private ScheduledExecutorService scheduler;
   private volatile boolean isRunning = false;
 
-  private ReplicationLogReplayService(final Configuration conf) {
+  protected ReplicationLogReplayService(final Configuration conf) {
     this.conf = conf;
   }

Review Comment:
   The constructor visibility was widened from private to protected, which 
increases the class's instantiation surface area despite this being a singleton 
accessed via getInstance(). Since there are no in-repo subclasses and tests can 
inject a mock via setInstanceForTesting(), keeping the constructor private 
better preserves the singleton invariant.



##########
phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java:
##########
@@ -199,6 +200,17 @@ public CompactionScanner(RegionCoprocessorEnvironment env, 
Store store,
     this.maxLookbackWindowStart = this.maxLookbackInMillis == 0
       ? compactionTime
       : compactionTime - (this.maxLookbackInMillis + 1);
+    Configuration conf = env.getConfiguration();
+    boolean replayEnabled =
+      
conf.getBoolean(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED,
+        ReplicationLogReplayService.DEFAULT_REPLICATION_REPLAY_ENABLED);
+    boolean guardEnabled =
+      
conf.getBoolean(ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED,
+        
ReplicationLogReplayService.DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED);
+    if (replayEnabled && guardEnabled) {
+      this.maxLookbackWindowStart = 
ReplicationLogReplayService.applyReplicationConsistencyGuard(
+        this.maxLookbackWindowStart, conf, tableName, columnFamilyName);
+    }

Review Comment:
   The replication consistency guard is currently applied for all 
CompactionScanner instantiations (including flushes/minor compactions). This 
can add unnecessary overhead (fetching consistency points) and can broaden 
retention behavior beyond major compactions, even though delete-marker purging 
behavior is primarily relevant during major compaction.



##########
phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.reader;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+/**
+ * Tests for the replication consistency point guard in 
ReplicationLogReplayService. Tests the pure
+ * adjustment logic (adjustMaxLookbackWindowStart) which is the core of the 
guard — ensuring
+ * maxLookbackWindowStart is floored to the consistency point.
+ */
+public class ReplicationCompactionGuardTest {
+
+  private static final String TABLE_NAME = "TEST_TABLE";
+  private static final String CF_NAME = "0";
+
+  @Test
+  public void testAdjustsWindowWhenConsistencyPointIsLower() {
+    long maxLookbackWindowStart = 1000000L;
+    long consistencyPoint = 500000L;
+
+    long result = 
ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart,
+      consistencyPoint, TABLE_NAME, CF_NAME);
+
+    assertEquals(consistencyPoint, result);
+  }
+
+  @Test
+  public void testNoChangeWhenConsistencyPointIsHigher() {
+    long maxLookbackWindowStart = 500000L;
+    long consistencyPoint = 1000000L;
+
+    long result = 
ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart,
+      consistencyPoint, TABLE_NAME, CF_NAME);
+
+    assertEquals(maxLookbackWindowStart, result);
+  }
+
+  @Test
+  public void testNoChangeWhenConsistencyPointEqualsWindowStart() {
+    long maxLookbackWindowStart = 500000L;
+    long consistencyPoint = 500000L;
+
+    long result = 
ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart,
+      consistencyPoint, TABLE_NAME, CF_NAME);
+
+    assertEquals(maxLookbackWindowStart, result);
+  }
+
+  @Test
+  public void testConsistencyPointAtZeroRetainsAll() {
+    long maxLookbackWindowStart = 1000000L;
+    long consistencyPoint = 0L;
+
+    long result = 
ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart,
+      consistencyPoint, TABLE_NAME, CF_NAME);
+
+    assertEquals(0L, result);
+  }
+
+  @Test
+  public void testLargeTimestampsNoAdjustmentNeeded() {
+    long maxLookbackWindowStart = System.currentTimeMillis() - 86400000L;
+    long consistencyPoint = System.currentTimeMillis() - 120000L;
+

Review Comment:
   These tests use System.currentTimeMillis() to construct inputs, which makes 
them less deterministic than necessary. Using a fixed reference timestamp keeps 
the test purely about the adjustment logic and avoids any potential timing 
flakiness.



##########
phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.reader;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+/**
+ * Tests for the replication consistency point guard in 
ReplicationLogReplayService. Tests the pure
+ * adjustment logic (adjustMaxLookbackWindowStart) which is the core of the 
guard — ensuring
+ * maxLookbackWindowStart is floored to the consistency point.
+ */
+public class ReplicationCompactionGuardTest {
+
+  private static final String TABLE_NAME = "TEST_TABLE";
+  private static final String CF_NAME = "0";
+
+  @Test
+  public void testAdjustsWindowWhenConsistencyPointIsLower() {
+    long maxLookbackWindowStart = 1000000L;
+    long consistencyPoint = 500000L;
+
+    long result = 
ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart,
+      consistencyPoint, TABLE_NAME, CF_NAME);
+
+    assertEquals(consistencyPoint, result);
+  }
+
+  @Test
+  public void testNoChangeWhenConsistencyPointIsHigher() {
+    long maxLookbackWindowStart = 500000L;
+    long consistencyPoint = 1000000L;
+
+    long result = 
ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart,
+      consistencyPoint, TABLE_NAME, CF_NAME);
+
+    assertEquals(maxLookbackWindowStart, result);
+  }
+
+  @Test
+  public void testNoChangeWhenConsistencyPointEqualsWindowStart() {
+    long maxLookbackWindowStart = 500000L;
+    long consistencyPoint = 500000L;
+
+    long result = 
ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart,
+      consistencyPoint, TABLE_NAME, CF_NAME);
+
+    assertEquals(maxLookbackWindowStart, result);
+  }
+
+  @Test
+  public void testConsistencyPointAtZeroRetainsAll() {
+    long maxLookbackWindowStart = 1000000L;
+    long consistencyPoint = 0L;
+
+    long result = 
ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart,
+      consistencyPoint, TABLE_NAME, CF_NAME);
+
+    assertEquals(0L, result);
+  }
+
+  @Test
+  public void testLargeTimestampsNoAdjustmentNeeded() {
+    long maxLookbackWindowStart = System.currentTimeMillis() - 86400000L;
+    long consistencyPoint = System.currentTimeMillis() - 120000L;
+
+    long result = 
ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart,
+      consistencyPoint, TABLE_NAME, CF_NAME);
+
+    assertEquals(maxLookbackWindowStart, result);
+  }
+
+  @Test
+  public void testConsistencyPointFarInPastPushesWindowBack() {
+    long maxLookbackWindowStart = System.currentTimeMillis() - 86400000L;
+    long consistencyPoint = System.currentTimeMillis() - 604800000L;
+

Review Comment:
   These tests use System.currentTimeMillis() to construct inputs, which makes 
them less deterministic than necessary. Using a fixed reference timestamp keeps 
the test purely about the adjustment logic and avoids any potential timing 
flakiness.



##########
phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java:
##########
@@ -105,6 +117,16 @@ public static ReplicationLogReplayService 
getInstance(Configuration conf) throws
     return instance;
   }
 
+  @VisibleForTesting
+  public static void setInstanceForTesting(ReplicationLogReplayService 
testInstance) {
+    instance = testInstance;
+  }
+
+  @VisibleForTesting
+  public static void resetInstanceForTesting() {
+    instance = null;
+  }

Review Comment:
   resetInstanceForTesting is annotated `@VisibleForTesting` but is public, 
which makes it callable from production code and expands the supported API 
surface. Since all current call sites are in the same package, this can be 
package-private to reduce accidental misuse.



##########
phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java:
##########
@@ -105,6 +117,16 @@ public static ReplicationLogReplayService 
getInstance(Configuration conf) throws
     return instance;
   }
 
+  @VisibleForTesting
+  public static void setInstanceForTesting(ReplicationLogReplayService 
testInstance) {
+    instance = testInstance;
+  }

Review Comment:
   setInstanceForTesting is annotated `@VisibleForTesting` but is public, which 
makes it callable from production code and expands the supported API surface. 
Since all current call sites are in the same package, this can be 
package-private to reduce accidental misuse.



##########
phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java:
##########
@@ -229,6 +251,37 @@ protected long getConsistencyPoint() throws IOException, 
SQLException {
     return consistencyPoint;
   }
 
+  /**
+   * Applies the replication replay consistency point as a floor on 
maxLookbackWindowStart. On
+   * standby clusters, this prevents compaction from dropping delete markers 
that have timestamps
+   * newer than the consistency point.
+   */
+  public static long applyReplicationConsistencyGuard(long 
currentMaxLookbackWindowStart,
+    Configuration conf, String tableName, String columnFamilyName) {
+    try {
+      long consistencyPoint = getInstance(conf).getConsistencyPoint();
+      return adjustMaxLookbackWindowStart(currentMaxLookbackWindowStart, 
consistencyPoint,
+        tableName, columnFamilyName);
+    } catch (Exception e) {
+      LOG.warn("Replication guard enabled but consistency point unavailable 
for table={} store={}."
+        + " Retaining all delete markers.", tableName, columnFamilyName, e);
+      return 0L;
+    }
+  }
+
+  @VisibleForTesting
+  static long adjustMaxLookbackWindowStart(long currentMaxLookbackWindowStart,
+    long consistencyPoint, String tableName, String columnFamilyName) {
+    long adjusted = Math.min(currentMaxLookbackWindowStart, consistencyPoint);
+    if (adjusted < currentMaxLookbackWindowStart) {
+      LOG.info(
+        "Replication guard: table={} store={} maxLookbackWindowStart adjusted 
from {} to {}"
+          + " (consistencyPoint={})",
+        tableName, columnFamilyName, currentMaxLookbackWindowStart, adjusted, 
consistencyPoint);

Review Comment:
   The replication guard adjustment is likely to run during many compactions; 
logging each adjustment at INFO may generate high log volume on a lagging 
standby cluster. Similar consistency-point-related logging in the replay path 
is DEBUG, so this should likely be DEBUG as well.



##########
phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.reader;
+
+import static 
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY;
+import static org.apache.phoenix.util.TestUtil.assertRawRowCount;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Map;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ManualEnvironmentEdge;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+
+/**
+ * Integration tests for the replication consistency point compaction guard. 
Verifies that
+ * CompactionScanner retains delete markers with timestamps newer than the 
consistency point on
+ * clusters where replication replay is enabled.
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class CompactionReplicationGuardIT extends BaseTest {
+
+  private static final int MAX_LOOKBACK_AGE = 15;
+  private static final int ROWS_POPULATED = 2;
+  private ManualEnvironmentEdge injectEdge;
+
+  @BeforeClass
+  public static synchronized void doSetup() throws Exception {
+    Map<String, String> props = Maps.newHashMapWithExpectedSize(5);
+    props.put(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, 
Integer.toString(MAX_LOOKBACK_AGE));
+    props.put(QueryServices.PHOENIX_COMPACTION_ENABLED, 
Boolean.toString(true));
+    props.put(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED,
+      Boolean.toString(true));
+    props.put(ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED,
+      Boolean.toString(true));
+    props.put("hbase.procedure.remote.dispatcher.delay.msec", "0");
+    setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+  }
+
+  @Before
+  public void beforeTest() throws Exception {
+    EnvironmentEdgeManager.reset();
+    injectEdge = new ManualEnvironmentEdge();
+    injectEdge.setValue(System.currentTimeMillis());
+    EnvironmentEdgeManager.injectEdge(injectEdge);
+  }
+
+  @After
+  public synchronized void afterTest() throws Exception {
+    ReplicationLogReplayService.resetInstanceForTesting();
+    EnvironmentEdgeManager.reset();
+    boolean refCountLeaked = isAnyStoreRefCountLeaked();
+    assertFalse("refCount leaked", refCountLeaked);
+  }
+
+  /**
+   * Test 1: Guard retains delete markers that maxLookback would have purged. 
The consistency point
+   * is set BEFORE the delete timestamp, so the delete marker is newer than 
the consistency point
+   * and must be retained even after maxLookback window passes.
+   */
+  @Test(timeout = 120000L)
+  public void testGuardRetainsDeleteMarkersNewerThanConsistencyPoint() throws 
Exception {
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      String dataTableName = generateUniqueName();
+      createTable(dataTableName);
+      TableName dataTable = TableName.valueOf(dataTableName);
+      populateTable(dataTableName);
+
+      injectEdge.incrementValue(1);
+      long beforeDeleteTime = EnvironmentEdgeManager.currentTimeMillis();
+
+      // Delete a row
+      conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE 
id = 'a'");
+      conn.commit();
+      injectEdge.incrementValue(1);
+
+      // Set consistency point BEFORE the delete — meaning replay hasn't 
caught up to the delete
+      long consistencyPoint = beforeDeleteTime - 1;
+      injectMockConsistencyPoint(consistencyPoint);
+
+      // Advance time past maxLookback window — without guard, marker would be 
purged
+      injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000);
+
+      flush(dataTable);
+      majorCompact(dataTable);
+
+      // Delete marker should be retained because its timestamp > 
consistencyPoint
+      assertRawRowCount(conn, dataTable, ROWS_POPULATED);
+    }
+  }
+
+  /**
+   * Test 2: Both maxLookback and guard allow purge. The consistency point has 
advanced past the
+   * delete marker AND maxLookback window has passed — marker should be purged.
+   */
+  @Test(timeout = 120000L)
+  public void 
testDeleteMarkersPurgedWhenOlderThanBothConsistencyPointAndMaxLookback()
+    throws Exception {
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      String dataTableName = generateUniqueName();
+      createTable(dataTableName);
+      TableName dataTable = TableName.valueOf(dataTableName);
+      populateTable(dataTableName);
+
+      injectEdge.incrementValue(1);
+
+      // Delete a row
+      conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE 
id = 'a'");
+      conn.commit();
+      injectEdge.incrementValue(1);
+
+      // Advance time past maxLookback
+      injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000);
+
+      // Set consistency point to current time — replay is fully caught up
+      long consistencyPoint = EnvironmentEdgeManager.currentTimeMillis();
+      injectMockConsistencyPoint(consistencyPoint);
+
+      flush(dataTable);
+      majorCompact(dataTable);
+
+      // Delete marker should be purged — both guard and maxLookback agree
+      assertRawRowCount(conn, dataTable, ROWS_POPULATED - 1);
+    }
+  }
+
+  /**
+   * Test 3: MaxLookback retains even when guard wouldn't. Consistency point 
has advanced past the
+   * delete, but we're still within the maxLookback window — marker retained 
by maxLookback.
+   */
+  @Test(timeout = 120000L)
+  public void testMaxLookbackRetainsEvenWhenGuardAllowsPurge() throws 
Exception {
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      String dataTableName = generateUniqueName();
+      createTable(dataTableName);
+      TableName dataTable = TableName.valueOf(dataTableName);
+      populateTable(dataTableName);
+
+      injectEdge.incrementValue(1);
+
+      // Delete a row
+      conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE 
id = 'a'");
+      conn.commit();
+      injectEdge.incrementValue(1);
+
+      // Set consistency point to current time — guard would allow purge
+      long consistencyPoint = EnvironmentEdgeManager.currentTimeMillis();
+      injectMockConsistencyPoint(consistencyPoint);
+
+      // Do NOT advance past maxLookback — still within the window
+      injectEdge.incrementValue(1);
+
+      flush(dataTable);
+      majorCompact(dataTable);
+
+      // Delete marker retained because still within maxLookback window
+      assertRawRowCount(conn, dataTable, ROWS_POPULATED);
+    }
+  }
+
+  /**
+   * Test 4: Guard fallback when consistency point unavailable — retains all 
delete markers. When
+   * the replay service throws an exception (e.g., not initialized), the guard 
falls back to
+   * retaining all markers to avoid data loss.
+   */
+  @Test(timeout = 120000L)
+  public void testGuardFallbackRetainsAllWhenConsistencyPointUnavailable() 
throws Exception {
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      String dataTableName = generateUniqueName();
+      createTable(dataTableName);
+      TableName dataTable = TableName.valueOf(dataTableName);
+      populateTable(dataTableName);
+
+      injectEdge.incrementValue(1);
+
+      // Delete a row
+      conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE 
id = 'a'");
+      conn.commit();
+      injectEdge.incrementValue(1);
+
+      // Inject a mock that throws — simulating uninitialized replay service
+      ReplicationLogReplayService mockService = 
mock(ReplicationLogReplayService.class);
+      when(mockService.getConsistencyPoint())
+        .thenThrow(new IOException("HA groups not initialized"));
+      ReplicationLogReplayService.setInstanceForTesting(mockService);
+
+      // Advance past maxLookback
+      injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000);
+
+      flush(dataTable);
+      majorCompact(dataTable);
+
+      // Fallback retains all — delete marker NOT purged despite maxLookback 
passing
+      assertRawRowCount(conn, dataTable, ROWS_POPULATED);
+    }
+  }
+
+  private void injectMockConsistencyPoint(long consistencyPoint) throws 
IOException, SQLException {
+    ReplicationLogReplayService mockService = 
mock(ReplicationLogReplayService.class);
+    when(mockService.getConsistencyPoint()).thenReturn(consistencyPoint);
+    ReplicationLogReplayService.setInstanceForTesting(mockService);
+  }

Review Comment:
   injectMockConsistencyPoint doesn't throw IOException/SQLException; declaring 
these checked exceptions makes the helper harder to reuse and adds noise to 
callers. The signature can be simplified.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to