This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new 6931e7167e PHOENIX-7793 Addendum Replication Log writer improvements
(#2459)
6931e7167e is described below
commit 6931e7167ee9e5fee3c18de9eb1e63eded79278b
Author: tkhurana <[email protected]>
AuthorDate: Tue May 5 14:16:44 2026 -0700
PHOENIX-7793 Addendum Replication Log writer improvements (#2459)
---
.../PhoenixWALSyncTimeoutException.java | 36 +++
.../apache/phoenix/replication/ReplicationLog.java | 78 +++---
.../phoenix/replication/ReplicationLogGroup.java | 37 ++-
.../phoenix/replication/ReplicationModeImpl.java | 12 +-
.../replication/StoreAndForwardModeImpl.java | 6 +-
.../replication/SyncAndForwardModeImpl.java | 7 +-
.../apache/phoenix/replication/SyncModeImpl.java | 7 +-
.../replication/log/LogFileFormatWriter.java | 3 +
.../phoenix/replication/log/LogFileWriter.java | 29 +--
.../metrics/MetricsReplicationLogGroupSource.java | 6 +
.../MetricsReplicationLogGroupSourceImpl.java | 11 +-
.../metrics/ReplicationLogMetricValues.java | 11 +-
.../replication/ReplicationLogBaseTest.java | 13 +-
.../replication/ReplicationLogGroupTest.java | 261 ++++++++++++++++-----
.../replication/log/LogFileWriterSyncTest.java | 7 +-
15 files changed, 359 insertions(+), 165 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/PhoenixWALSyncTimeoutException.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/PhoenixWALSyncTimeoutException.java
new file mode 100644
index 0000000000..520e8ff23e
--- /dev/null
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/PhoenixWALSyncTimeoutException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import java.io.IOException;
+
+/**
+ * Thrown when a Phoenix replication log sync operation times out.
+ */
+public class PhoenixWALSyncTimeoutException extends IOException {
+
+ private static final long serialVersionUID = 1L;
+
+ public PhoenixWALSyncTimeoutException(String message) {
+ super(message);
+ }
+
+ public PhoenixWALSyncTimeoutException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
index b699981c24..d6b7b01535 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
@@ -22,6 +22,7 @@ import java.io.InterruptedIOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -77,6 +78,9 @@ public class ReplicationLog {
protected final AtomicLong rotationFailures = new AtomicLong(0);
// Staged writer created by the background LogRotationTask, drained by
checkAndReplaceWriter().
private final AtomicReference<LogFileWriter> pendingWriter = new
AtomicReference<>();
+ // Latch set by apply() before requesting an on-demand rotation; counted
down by LogRotationTask
+ // in a finally block so apply() can wait (with timeout) for a fresh writer
to be staged.
+ private volatile CountDownLatch rotationStagedLatch;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicBoolean rotationRequested = new AtomicBoolean(false);
private final ExecutorService closeExecutor;
@@ -98,12 +102,19 @@ public class ReplicationLog {
this.retryDelayMs =
conf.getLong(ReplicationLogGroup.REPLICATION_LOG_RETRY_DELAY_MS_KEY,
ReplicationLogGroup.DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS);
this.rotationTimeMs = shardManager.getReplicationRoundDurationSeconds() *
1000L;
- long rotationSize =
conf.getLong(ReplicationLogGroup.REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY,
- ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_SIZE_BYTES);
+ long configuredRotationSize =
+ conf.getLong(ReplicationLogGroup.REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY,
+ ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_SIZE_BYTES);
double rotationSizePercent =
conf.getDouble(ReplicationLogGroup.REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE_KEY,
ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE);
- this.rotationSizeBytes = (long) (rotationSize * rotationSizePercent);
+ long blockSize = shardManager.getFileSystem().getDefaultBlockSize();
+ if (configuredRotationSize > blockSize) {
+ LOG.warn("Configured rotation size {} exceeds HDFS block size {};
clamping to block size",
+ configuredRotationSize, blockSize);
+ }
+ long effectiveRotationSize = Math.min(configuredRotationSize, blockSize);
+ this.rotationSizeBytes = (long) (effectiveRotationSize *
rotationSizePercent);
this.maxRotationRetries =
conf.getInt(ReplicationLogGroup.REPLICATION_LOG_ROTATION_RETRIES_KEY,
ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_RETRIES);
String compressionName =
conf.get(ReplicationLogGroup.REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY,
@@ -142,6 +153,7 @@ public class ReplicationLog {
LogFileWriter newWriter = new LogFileWriter();
newWriter.init(writerContext);
newWriter.setGeneration(writerGeneration.incrementAndGet());
+ LOG.info("Created new writer: {}", newWriter);
return newWriter;
}
@@ -309,16 +321,15 @@ public class ReplicationLog {
} catch (IOException e) {
LOG.debug("Attempt {}/{} failed", attempt, maxAttempts, e);
if (attempt == maxAttempts) {
- closeOnError();
throw e;
}
- // First failure retries on the same writer (transient). Second failure
- // requests a new writer to recover from non-transient stream errors.
- if (attempt > 1) {
- requestRotation();
- }
+ // Each retry runs on a fresh writer. Stage a latch, request rotation,
and wait briefly
+ // for the LogRotationTask to count the latch down after staging a new
pendingWriter.
+ CountDownLatch latch = new CountDownLatch(1);
+ rotationStagedLatch = latch;
+ requestRotation();
try {
- Thread.sleep(retryDelayMs);
+ latch.await(retryDelayMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new InterruptedIOException("Interrupted during retry delay");
@@ -351,44 +362,31 @@ public class ReplicationLog {
}
/**
- * Force closes the log upon an unrecoverable internal error. This is a
fail-stop behavior: once
- * called, the log is marked as closed, the Disruptor is halted, and all
subsequent append() and
- * sync() calls will throw an IOException("Closed"). This ensures that no
further operations are
- * attempted on a log that has encountered a critical error.
+ * Closes the log with a bounded duration. Subsequent append() and sync()
calls will throw
+ * IOException("Closed"). Safe to call multiple times — only the first
invocation performs
+ * cleanup.
+ * @param graceful true for graceful shutdown, false for error/forced
shutdown
*/
- protected void closeOnError() {
+ public void close(boolean graceful) {
if (!closed.compareAndSet(false, true)) {
return;
}
stopRotationExecutor();
- closeExecutor.shutdownNow();
LogFileWriter staged = pendingWriter.getAndSet(null);
if (staged != null) {
- closeWriter(staged);
- }
- closeWriter(currentWriter);
- }
-
- /** Closes the log. */
- public void close() {
- if (!closed.compareAndSet(false, true)) {
- return;
+ submitClose(staged);
}
- stopRotationExecutor();
+ submitClose(currentWriter);
closeExecutor.shutdown();
try {
- if (!closeExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+ if (!closeExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
+ LOG.warn("Close executor did not terminate in 10s, abandoning writer
closes");
closeExecutor.shutdownNow();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
closeExecutor.shutdownNow();
}
- LogFileWriter staged = pendingWriter.getAndSet(null);
- if (staged != null) {
- closeWriter(staged);
- }
- closeWriter(currentWriter);
}
@VisibleForTesting
@@ -422,16 +420,22 @@ public class ReplicationLog {
}
rotationFailures.set(0);
logGroup.getMetrics().incrementRotationCount();
- } catch (IOException e) {
+ } catch (Throwable t) {
logGroup.getMetrics().incrementRotationFailureCount();
long numFailures = rotationFailures.incrementAndGet();
if (numFailures >= maxRotationRetries) {
LOG.error("Too many rotation failures ({}/{}), closing log",
numFailures,
- maxRotationRetries, e);
- closeOnError();
+ maxRotationRetries, t);
+ close(false);
} else {
- LOG.info("Failed to create new writer for rotation (attempt {}/{}),
retrying...",
- numFailures, maxRotationRetries, e);
+ LOG.error("Failed to create new writer for rotation (attempt {}/{}),
retrying...",
+ numFailures, maxRotationRetries, t);
+ }
+ } finally {
+ CountDownLatch latch = rotationStagedLatch;
+ if (latch != null) {
+ latch.countDown();
+ rotationStagedLatch = null;
}
}
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
index c69292a9a1..d4ac31b493 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
@@ -57,7 +57,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.phoenix.jdbc.HAGroupStoreManager;
import org.apache.phoenix.jdbc.HAGroupStoreRecord;
import org.apache.phoenix.jdbc.HAGroupStoreRecord.HAGroupState;
@@ -159,14 +158,15 @@ public class ReplicationLogGroup {
"phoenix.replication.log.sync.timeout.ms";
public static final String REPLICATION_LOG_SYNC_RETRIES_KEY =
"phoenix.replication.log.sync.retries";
- public static final int DEFAULT_REPLICATION_LOG_SYNC_RETRIES = 4;
+ public static final int DEFAULT_REPLICATION_LOG_SYNC_RETRIES = 1;
public static final String REPLICATION_LOG_ROTATION_RETRIES_KEY =
"phoenix.replication.log.rotation.retries";
public static final int DEFAULT_REPLICATION_LOG_ROTATION_RETRIES = 5;
public static final String REPLICATION_LOG_RETRY_DELAY_MS_KEY =
"phoenix.replication.log.retry.delay.ms";
public static final long DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS = 100L;
- private static final long DEFAULT_HDFS_WRITE_RPC_TIMEOUT_MS = 30 * 1000;
+ public static final String WAL_SYNC_TIMEOUT_MS_KEY =
"hbase.regionserver.wal.sync.timeout";
+ public static final long DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000L;
public static final String STANDBY_DIR = "in";
public static final String FALLBACK_DIR = "out";
@@ -438,22 +438,16 @@ public class ReplicationLogGroup {
}
/**
- * Calculate how long the application thread should wait for a sync to
finish. The application
- * thread here is the write rpc handler thread. It takes into account the
number of retries, pause
- * between successive attempts, dfs write timeout and zk session timeouts.
+ * Calculate how long the application thread should wait for a sync to
finish. Uses the SAF sync
+ * timeout as the base — the consumer thread must have time to attempt SYNC
retries, flip to SAF,
+ * attempt SAF retries, and either succeed or abort before the RPC handler
gives up.
* @return sync timeout in ms
*/
protected long calculateSyncTimeout() {
- int maxAttempts =
- conf.getInt(REPLICATION_LOG_SYNC_RETRIES_KEY,
DEFAULT_REPLICATION_LOG_SYNC_RETRIES) + 1;
- long retryDelayMs =
- conf.getLong(REPLICATION_LOG_RETRY_DELAY_MS_KEY,
DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS);
- long wrtiteRpcTimeout =
conf.getLong(DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
- DEFAULT_HDFS_WRITE_RPC_TIMEOUT_MS);
+ long walSyncTimeout = conf.getLong(WAL_SYNC_TIMEOUT_MS_KEY,
DEFAULT_WAL_SYNC_TIMEOUT_MS);
// account for HAGroupStore update when we switch replication mode
long zkTimeoutMs = conf.getLong(ZK_SESSION_TIMEOUT,
DEFAULT_ZK_SESSION_TIMEOUT);
- long totalRpcTimeout = maxAttempts * wrtiteRpcTimeout + (maxAttempts - 1)
* retryDelayMs;
- return 2 * totalRpcTimeout + zkTimeoutMs;
+ return walSyncTimeout + zkTimeoutMs;
}
/**
@@ -583,10 +577,11 @@ public class ReplicationLogGroup {
LOG.error(message, e);
abort(message, e);
} catch (TimeoutException e) {
- String message = String.format("HAGroup %s sync operation timed out",
this);
- LOG.error(message);
- // sync timeout is a fatal error
- abort(message, e);
+ String message =
+ String.format("HAGroup %s replication log sync timed out after %d ms",
this, syncTimeoutMs);
+ LOG.error(message, e);
+ PhoenixWALSyncTimeoutException timeoutEx = new
PhoenixWALSyncTimeoutException(message, e);
+ abort(message, timeoutEx);
}
}
@@ -914,7 +909,7 @@ public class ReplicationLogGroup {
future.complete(null);
}
pendingSyncFutures.clear();
- LOG.info("Sync operation completed successfully up to sequence {}",
sequence);
+ LOG.debug("Sync operation completed successfully up to sequence {}",
sequence);
// after a successful sync check the mode set on the replication group
// Doing the mode check on sync points makes the implementation more
robust
// since we can guarantee that all unsynced appends have been flushed to
the
@@ -1053,6 +1048,10 @@ public class ReplicationLogGroup {
// and got an exception. This is a fatal exception so halt the
disruptor
// fail the pending sync events with the original exception
failPendingSyncs(sequence, e);
+ // Both SYNC and SAF writes failed. The local HBase WAL has the
mutation but neither
+ // replication pipeline does, so the SAF forwarder cannot reconcile
it. Abort the RS
+ // so region reassignment + preWALRestore re-ships the orphaned
edits.
+ abort("Both SYNC and SAF replication writes failed", fatalEx);
// halt the disruptor with the fatal exception
throw fatalEx;
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java
index 8e569b5f31..1238513135 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java
@@ -86,17 +86,9 @@ public abstract class ReplicationModeImpl {
getReplicationLog().sync();
}
- /** Graceful close */
- void closeReplicationLog() {
+ void closeReplicationLog(boolean graceful) {
if (log != null) {
- log.close();
- }
- }
-
- /** Forced close */
- void closeReplicationLogOnError() {
- if (log != null) {
- log.closeOnError();
+ log.close(graceful);
}
}
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java
index ab5c26b2da..ea18a5b853 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java
@@ -105,11 +105,7 @@ public class StoreAndForwardModeImpl extends
ReplicationModeImpl {
void onExit(boolean gracefulShutdown) {
LOG.info("HAGroup {} exiting mode {} graceful={}", logGroup, this,
gracefulShutdown);
stopHAGroupStoreUpdateTask();
- if (gracefulShutdown) {
- closeReplicationLog();
- } else {
- closeReplicationLogOnError();
- }
+ closeReplicationLog(gracefulShutdown);
}
@Override
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java
index 4b409633d9..aefce975cf 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java
@@ -55,16 +55,13 @@ public class SyncAndForwardModeImpl extends
ReplicationModeImpl {
LOG.info("HAGroup {} exiting mode {} graceful={}", logGroup, this,
gracefulShutdown);
// stop the replication log forwarding
logGroup.getLogForwarder().stop();
- if (gracefulShutdown) {
- closeReplicationLog();
- } else {
- closeReplicationLogOnError();
- }
+ closeReplicationLog(gracefulShutdown);
}
@Override
ReplicationMode onFailure(Throwable e) throws IOException {
LOG.info("HAGroup {} mode={} got error", logGroup, this, e);
+ logGroup.getMetrics().incrementSyncToSafTransitions();
try {
logGroup.setHAGroupStatusToStoreAndForward();
} catch (Exception ex) {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java
index b3370a8a50..75e175ad4e 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java
@@ -50,16 +50,13 @@ public class SyncModeImpl extends ReplicationModeImpl {
@Override
void onExit(boolean gracefulShutdown) {
LOG.info("HAGroup {} exiting mode {} graceful={}", logGroup, this,
gracefulShutdown);
- if (gracefulShutdown) {
- closeReplicationLog();
- } else {
- closeReplicationLogOnError();
- }
+ closeReplicationLog(gracefulShutdown);
}
@Override
ReplicationMode onFailure(Throwable e) throws IOException {
LOG.info("HAGroup {} mode={} got error", logGroup, this, e);
+ logGroup.getMetrics().incrementSyncToSafTransitions();
try {
// first update the HAGroupStore state
logGroup.setHAGroupStatusToStoreAndForward();
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
index 2555f17225..11b427112a 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
@@ -59,6 +59,9 @@ public class LogFileFormatWriter implements Closeable {
this.encoder = context.getCodec().getEncoder(blockDataStream);
// Write header immediately when file is created
writeFileHeader();
+ // Sync the header to force the first HDFS block allocation on the
caller's thread (the
+ // rotation thread). Without this, addBlock fires on the consumer thread's
first sync().
+ output.sync();
}
private void writeFileHeader() throws IOException {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
index e1c3aadf49..2a0f4c5dd1 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.replication.log;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.client.Mutation;
@@ -35,7 +36,7 @@ public class LogFileWriter implements LogFile.Writer {
private LogFileWriterContext context;
private LogFileFormatWriter writer;
- private volatile boolean closed = false;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
/**
* A monotonically increasing sequence number that identifies this writer
instance, used to detect
* log file rotations and ensure proper handling of in-flight operations.
Higher layers will get a
@@ -72,9 +73,13 @@ public class LogFileWriter implements LogFile.Writer {
LOG.debug("Initialized LogFileWriter for path {}", context.getFilePath());
}
+ public boolean isClosed() {
+ return closed.get();
+ }
+
@Override
public void append(String tableName, long commitId, Mutation mutation)
throws IOException {
- if (closed) {
+ if (isClosed()) {
throw new IOException("Writer has been closed");
}
writer.append(
@@ -83,7 +88,7 @@ public class LogFileWriter implements LogFile.Writer {
@Override
public void sync() throws IOException {
- if (closed) {
+ if (isClosed()) {
throw new IOException("Writer has been closed");
}
writer.sync();
@@ -91,7 +96,7 @@ public class LogFileWriter implements LogFile.Writer {
@Override
public long getLength() throws IOException {
- if (closed) {
+ if (isClosed()) {
// Attempt to get length from filesystem if stream is closed
if (context.getFileSystem().exists(context.getFilePath())) {
return
context.getFileSystem().getFileStatus(context.getFilePath()).getLen();
@@ -108,23 +113,19 @@ public class LogFileWriter implements LogFile.Writer {
@Override
public void close() throws IOException {
- if (closed) {
+ if (!closed.compareAndSet(false, true)) {
return;
}
- try {
- // Close the final block and write the trailer
- if (writer != null) {
- writer.close();
- }
- } finally {
- closed = true;
- LOG.debug("Closed LogFileWriter for path {}", context.getFilePath());
+ // Close the final block and write the trailer
+ if (writer != null) {
+ writer.close();
}
+ LOG.debug("Closed LogFileWriter {}", this);
}
@Override
public String toString() {
- return "LogFileWriter [formatWriter=" + writer + ", closed=" + closed + ",
generation="
+ return "LogFileWriter [formatWriter=" + writer + ", closed=" + isClosed()
+ ", generation="
+ generation + "]";
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
index b3cc095048..b81df30b5f 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
@@ -45,6 +45,9 @@ public interface MetricsReplicationLogGroupSource extends
BaseSource {
String RING_BUFFER_TIME = "ringBufferTime";
String RING_BUFFER_TIME_DESC = "Time events spend in the ring buffer";
+ String SYNC_TO_SAF_TRANSITIONS = "syncToSafTransitions";
+ String SYNC_TO_SAF_TRANSITIONS_DESC = "Number of SYNC to STORE_AND_FORWARD
mode transitions";
+
/**
* Increments the counter for total log rotations. This counter tracks the
total number of times
* the log was rotated, regardless of reason.
@@ -81,6 +84,9 @@ public interface MetricsReplicationLogGroupSource extends
BaseSource {
*/
void incrementRotationFailureCount();
+ /** Increment the SYNC to STORE_AND_FORWARD transition counter. */
+ void incrementSyncToSafTransitions();
+
/**
* Unregister this metrics source.
*/
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
index 718f6ac63d..ea552b47cd 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
@@ -28,6 +28,7 @@ public class MetricsReplicationLogGroupSourceImpl extends
BaseSourceImpl
private final MutableFastCounter rotationCount;
private final MutableFastCounter rotationFailuresCount;
+ private final MutableFastCounter syncToSafTransitions;
private final MutableHistogram appendTime;
private final MutableHistogram syncTime;
private final MutableHistogram rotationTime;
@@ -44,6 +45,8 @@ public class MetricsReplicationLogGroupSourceImpl extends
BaseSourceImpl
rotationCount = getMetricsRegistry().newCounter(ROTATION_COUNT,
ROTATION_COUNT_DESC, 0L);
rotationFailuresCount =
getMetricsRegistry().newCounter(ROTATION_FAILURES,
ROTATION_FAILURES_DESC, 0L);
+ syncToSafTransitions =
+ getMetricsRegistry().newCounter(SYNC_TO_SAF_TRANSITIONS,
SYNC_TO_SAF_TRANSITIONS_DESC, 0L);
appendTime = getMetricsRegistry().newHistogram(APPEND_TIME,
APPEND_TIME_DESC);
syncTime = getMetricsRegistry().newHistogram(SYNC_TIME, SYNC_TIME_DESC);
rotationTime = getMetricsRegistry().newHistogram(ROTATION_TIME,
ROTATION_TIME_DESC);
@@ -65,6 +68,11 @@ public class MetricsReplicationLogGroupSourceImpl extends
BaseSourceImpl
rotationFailuresCount.incr();
}
+ @Override
+ public void incrementSyncToSafTransitions() {
+ syncToSafTransitions.incr();
+ }
+
@Override
public void updateAppendTime(long timeNs) {
appendTime.add(timeNs);
@@ -88,7 +96,8 @@ public class MetricsReplicationLogGroupSourceImpl extends
BaseSourceImpl
@Override
public ReplicationLogMetricValues getCurrentMetricValues() {
return new ReplicationLogMetricValues(rotationCount.value(),
rotationFailuresCount.value(),
- appendTime.getMax(), syncTime.getMax(), rotationTime.getMax(),
ringBufferTime.getMax());
+ syncToSafTransitions.value(), appendTime.getMax(), syncTime.getMax(),
rotationTime.getMax(),
+ ringBufferTime.getMax());
}
@Override
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
index ef64552ece..ac1aab4e20 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
@@ -22,15 +22,18 @@ public class ReplicationLogMetricValues {
private final long rotationCount;
private final long rotationFailuresCount;
+ private final long syncToSafTransitions;
private final long appendTime;
private final long syncTime;
private final long rotationTime;
private final long ringBufferTime;
- public ReplicationLogMetricValues(long rotationCount, long
rotationFailuresCount, long appendTime,
- long syncTime, long rotationTime, long ringBufferTime) {
+ public ReplicationLogMetricValues(long rotationCount, long
rotationFailuresCount,
+ long syncToSafTransitions, long appendTime, long syncTime, long
rotationTime,
+ long ringBufferTime) {
this.rotationCount = rotationCount;
this.rotationFailuresCount = rotationFailuresCount;
+ this.syncToSafTransitions = syncToSafTransitions;
this.appendTime = appendTime;
this.syncTime = syncTime;
this.rotationTime = rotationTime;
@@ -45,6 +48,10 @@ public class ReplicationLogMetricValues {
return rotationFailuresCount;
}
+ public long getSyncToSafTransitions() {
+ return syncToSafTransitions;
+ }
+
public long getAppendTime() {
return appendTime;
}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java
index 5ff9fc2d41..063020523e 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java
@@ -113,8 +113,7 @@ public class ReplicationLogBaseTest {
storeRecord = initHAGroupStoreRecord();
doReturn(Optional.of(storeRecord)).when(haGroupStoreManager).getHAGroupStoreRecord(anyString());
- logGroup = new TestableLogGroup(conf, serverName, haGroupName,
haGroupStoreManager);
- logGroup.init();
+ logGroup = createAndInitLogGroup();
}
@After
@@ -133,8 +132,14 @@ public class ReplicationLogBaseTest {
if (logGroup != null) {
logGroup.close();
}
- logGroup = new TestableLogGroup(conf, serverName, haGroupName,
haGroupStoreManager);
- logGroup.init();
+ logGroup = createAndInitLogGroup();
+ }
+
+ private ReplicationLogGroup createAndInitLogGroup() throws Exception {
+ ReplicationLogGroup group =
+ spy(new TestableLogGroup(conf, serverName, haGroupName,
haGroupStoreManager));
+ group.init();
+ return group;
}
protected static void waitForRotationTick(int roundDurationSeconds) throws
InterruptedException {
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
index 3bd389a0d5..727959541e 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
@@ -30,9 +30,9 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
@@ -42,12 +42,15 @@ import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.phoenix.jdbc.HAGroupStoreRecord;
+import org.apache.phoenix.jdbc.HAGroupStoreRecord.HAGroupState;
+import org.apache.phoenix.jdbc.HighAvailabilityPolicy;
import org.apache.phoenix.replication.log.LogFile;
import org.apache.phoenix.replication.log.LogFileReader;
import org.apache.phoenix.replication.log.LogFileReaderContext;
@@ -259,12 +262,14 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
// Append some data
logGroup.append(tableName, commitId, put);
- // sync on the writer will timeout
+ // sync on the writer will timeout — syncInternal wraps in
PhoenixWALSyncTimeoutException
+ // and calls abort() which throws RuntimeException
try {
logGroup.sync();
fail("Should have thrown RuntimeException because sync timed out");
} catch (RuntimeException e) {
- assertTrue("Expected timeout exception", e.getCause() instanceof
TimeoutException);
+ assertTrue("Expected PhoenixWALSyncTimeoutException cause",
+ e.getCause() instanceof PhoenixWALSyncTimeoutException);
}
// reset
doNothing().when(innerWriter).sync();
@@ -619,12 +624,14 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
anyLong(), any(Mutation.class));
// Append data. This should trigger the LogExceptionHandler, which will
close logWriter.
+ // The sync future times out, syncInternal wraps in
PhoenixWALSyncTimeoutException and aborts.
logGroup.append(tableName, commitId, put);
try {
logGroup.sync();
- fail("Should have thrown Runtime because sync timed out");
+ fail("Should have thrown RuntimeException because sync timed out");
} catch (RuntimeException e) {
- assertTrue("Expected timeout exception", e.getCause() instanceof
TimeoutException);
+ assertTrue("Expected PhoenixWALSyncTimeoutException cause",
+ e.getCause() instanceof PhoenixWALSyncTimeoutException);
}
// Verify that subsequent operations fail because the log is closed
@@ -672,8 +679,9 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
// Try to sync. Should fail after exhausting retries and then switch to
STORE_AND_FORWARD
logGroup.sync();
- // All retries use the same writer — verify sync was attempted maxAttempts
times
- verify(initialWriter, atLeast(2)).sync();
+ // Attempt 1 syncs on initialWriter (fails), rotation creates a new writer
(also fails),
+ // attempt 2 syncs on the rotated writer (fails). Both SYNC attempts
exhausted → SAF flip.
+ verify(initialWriter, times(1)).sync();
assertEquals(STORE_AND_FORWARD, logGroup.getMode());
}
@@ -963,12 +971,14 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
doThrow(new RuntimeException("Simulated critical
error")).when(innerWriter).getLength();
// Append data. This should trigger the LogExceptionHandler, which will
close logWriter.
+ // The sync future times out, syncInternal wraps in
PhoenixWALSyncTimeoutException and aborts.
logGroup.append(tableName, commitId, put);
try {
logGroup.sync();
fail("Should have thrown RuntimeException because sync timed out");
} catch (RuntimeException e) {
- assertTrue("Expected timeout exception", e.getCause() instanceof
TimeoutException);
+ assertTrue("Expected PhoenixWALSyncTimeoutException cause",
+ e.getCause() instanceof PhoenixWALSyncTimeoutException);
}
// Verify that subsequent operations fail because the log is closed
@@ -1008,7 +1018,8 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
logGroup.sync();
fail("Should have thrown RuntimeException because sync timed out");
} catch (RuntimeException e) {
- assertTrue("Expected timeout exception", e.getCause() instanceof
TimeoutException);
+ assertTrue("Expected PhoenixWALSyncTimeoutException cause",
+ e.getCause() instanceof PhoenixWALSyncTimeoutException);
}
// Verify that subsequent append operations fail because the log is closed
@@ -1048,7 +1059,8 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
logGroup.sync();
fail("Should have thrown RuntimeException because sync timed out");
} catch (RuntimeException e) {
- assertTrue("Expected timeout exception", e.getCause() instanceof
TimeoutException);
+ assertTrue("Expected PhoenixWALSyncTimeoutException cause",
+ e.getCause() instanceof PhoenixWALSyncTimeoutException);
}
// Verify that subsequent sync operations fail because the log is closed
@@ -1263,15 +1275,21 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
ReplicationLog activeLog = logGroup.getActiveLog();
LogFileWriter writer = activeLog.getWriter();
assertNotNull("Writer should not be null", writer);
- // keep returning the same writer
- doAnswer(invocation -> writer).when(activeLog).createNewWriter();
+
+ // Rotated writers must also fail on the 5th append so the retry doesn't
rescue the loop.
+ doAnswer(invocation -> {
+ LogFileWriter w = (LogFileWriter) invocation.callRealMethod();
+ doThrow(new IOException("Simulate append
failure")).when(w).append(tableName, commitId5,
+ put5);
+ return w;
+ }).when(activeLog).createNewWriter();
logGroup.append(tableName, commitId1, put1);
logGroup.append(tableName, commitId2, put2);
logGroup.append(tableName, commitId3, put3);
logGroup.append(tableName, commitId4, put4);
- // configure writer to throw IOException on the 5th append
+ // configure initial writer to throw IOException on the 5th append
doThrow(new IOException("Simulate append
failure")).when(writer).append(tableName, commitId5,
put5);
@@ -1456,9 +1474,8 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
}
/**
- * Tests that a failed replay is retried on the next attempt. The new
writer's first append fails
- * during replay, so the generation stays stale. On retry, the generation
mismatch is detected
- * again and replay succeeds.
+ * Tests that a failed replay is retried on a fresh writer. The first
rotated writer's append
+ * fails during replay. Rotation creates a second fresh writer; replay
succeeds on it.
*/
@Test
public void testReplayFailureRetries() throws Exception {
@@ -1491,27 +1508,25 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
activeLog.forceRotation();
// 3rd append triggers swap + replay of [r1, r2].
- // Attempt 1: replay fails on first record → IOException → generation
stays stale
- // Attempt 2: generation mismatch still → replay retries → succeeds → r3
appended
+ // Attempt 1 on W2: replay fails on r1 → IOException → request rotation →
creates W3
+ // Attempt 2 on W3: replay r1+r2 succeeds → r3 appended → sync succeeds
logGroup.append(tableName, commitId + 2, put);
logGroup.sync();
- LogFileWriter newWriter = activeLog.getWriter();
- assertTrue("Should be using new writer", newWriter != initialWriter);
+ LogFileWriter finalWriter = activeLog.getWriter();
+ assertTrue("Should be using a fresh writer", finalWriter != initialWriter);
- // New writer: attempt 1 replay failed on r1 (1 call). Attempt 2 replayed
r1+r2 (2 calls)
- // then appended r3. Total: r1 called 2x, r2 called 1x, r3 called 1x.
- verify(newWriter, times(2)).append(eq(tableName), eq(commitId), eq(put));
- verify(newWriter, times(1)).append(eq(tableName), eq(commitId + 1),
eq(put));
- verify(newWriter, times(1)).append(eq(tableName), eq(commitId + 2),
eq(put));
- verify(newWriter, times(1)).sync();
+ // Final writer (W3): replayed r1+r2 then appended r3 — each exactly once.
+ verify(finalWriter, times(1)).append(eq(tableName), eq(commitId), eq(put));
+ verify(finalWriter, times(1)).append(eq(tableName), eq(commitId + 1),
eq(put));
+ verify(finalWriter, times(1)).append(eq(tableName), eq(commitId + 2),
eq(put));
+ verify(finalWriter, times(1)).sync();
}
/**
- * Tests error-recovery rotation: when the current writer's stream is
broken, the second failure
- * in apply() triggers requestRotation() which submits an on-demand
LogRotationTask. During the
- * retry sleep, the background thread creates a new writer. The next attempt
drains it and
- * succeeds.
+ * Tests error-recovery rotation: when the current writer's stream is
broken, the first failure in
+ * apply() triggers requestRotation() which submits an on-demand
LogRotationTask. During the latch
+ * wait, the background thread creates a new writer. The next attempt drains
it and succeeds.
*/
@Test
public void testErrorRecoveryRequestsNewWriter() throws Exception {
@@ -1527,16 +1542,16 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
doThrow(new IOException("Simulated broken
stream")).when(initialWriter).append(anyString(),
anyLong(), any(Mutation.class));
- // Append — attempt 1 fails, attempt 2 fails + requestRotation(), during
sleep the
- // background thread creates a new writer, attempt 3 drains it and succeeds
+ // Append — attempt 1 fails on initialWriter, rotation requested, attempt
2 drains the
+ // rotated writer and succeeds
logGroup.append(tableName, commitId, put);
logGroup.sync();
LogFileWriter newWriter = activeLog.getWriter();
assertTrue("Should be using a new writer after error recovery", newWriter
!= initialWriter);
- // Old writer received failed attempts
- verify(initialWriter, atLeast(2)).append(eq(tableName), eq(commitId),
eq(put));
+ // Old writer received 1 failed attempt
+ verify(initialWriter, times(1)).append(eq(tableName), eq(commitId),
eq(put));
// New writer received the successful append
verify(newWriter, times(1)).append(eq(tableName), eq(commitId), eq(put));
verify(newWriter, times(1)).sync();
@@ -1622,38 +1637,160 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
writerAfterRotation != writerBeforeRotation);
}
- // @Test
- public void testAppendTimeoutWhileSyncPending() throws Exception {
- final String tableName = "TESTTBL";
- final long commitId1 = 1L;
- final Mutation put1 = LogFileTestUtil.newPut("row1", 1, 1);
+ /**
+ * Tests that a RuntimeException in LogRotationTask does not suppress future
scheduled ticks.
+ * Prior to the Throwable catch fix, an unchecked exception would kill the
+ * ScheduledExecutorService silently.
+ */
+ @Test
+ public void testRuntimeExceptionInRotationDoesNotSuppressFutureTicks()
throws Exception {
+ final String tableName = "TBLRERT";
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+ final long commitId = 1L;
+ final int roundDurationSeconds = 5;
+
+ conf.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY,
roundDurationSeconds);
+ recreateLogGroup();
- // Get the inner writer
ReplicationLog activeLog = logGroup.getActiveLog();
- LogFileWriter writer = activeLog.getWriter();
- assertNotNull("Writer should not be null", writer);
- // keep returning the same writer
- // doAnswer(invocation -> writer).when(activeLog).createNewWriter();
- doAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- // Thread.sleep((long)(TEST_SYNC_TIMEOUT * 1.25)); // Simulate slow
append processing
- // throw new CallTimeoutException("Simulate append timeout");
- Object result = invocation.callRealMethod();
- sleep((long) (TEST_SYNC_TIMEOUT * 1.25)); // Simulate slow but
successful append
- return result;
+ LogFileWriter initialWriter = activeLog.getWriter();
+ assertNotNull("Initial writer should not be null", initialWriter);
+
+ // First rotation tick throws RuntimeException, subsequent ones succeed
+ AtomicBoolean shouldThrow = new AtomicBoolean(true);
+ doAnswer(invocation -> {
+ if (shouldThrow.getAndSet(false)) {
+ throw new RuntimeException("Simulated NPE in rotation");
}
- }).when(writer).append(anyString(), anyLong(), any(Mutation.class));
+ return invocation.callRealMethod();
+ }).when(activeLog).createNewWriter();
- logGroup.append(tableName, commitId1, put1);
+ // Append and sync to establish baseline
+ logGroup.append(tableName, commitId, put);
logGroup.sync();
- LogFileWriter storeAndForwardWriter = logGroup.getActiveLog().getWriter();
- assertTrue("After switching mode we should have a new writer", writer !=
storeAndForwardWriter);
- InOrder inOrder = Mockito.inOrder(storeAndForwardWriter);
- // verify that all the in-flight appends and syncs are replayed on the new
store and forward
- // writer
- inOrder.verify(storeAndForwardWriter, times(1)).append(eq(tableName),
eq(commitId1), eq(put1));
- inOrder.verify(storeAndForwardWriter, times(1)).sync();
+ // Wait for first tick — RuntimeException, rotation fails
+ waitForRotationTick(roundDurationSeconds);
+
+ assertTrue("rotationFailures should be incremented",
activeLog.rotationFailures.get() >= 1);
+
+ // Wait for second tick — should succeed (scheduler not suppressed)
+ waitForRotationTick(roundDurationSeconds);
+
+ // Drain the staged writer
+ logGroup.append(tableName, commitId + 1, put);
+ logGroup.sync();
+
+ LogFileWriter writerAfterRotation = activeLog.getWriter();
+ assertTrue("Second tick should have created a new writer",
+ writerAfterRotation != initialWriter);
+ assertEquals("rotationFailures should be reset after success", 0,
+ activeLog.rotationFailures.get());
+ }
+
+ /**
+ * Tests that when both SYNC attempts and both SAF attempts fail, the abort
path fires via
+ * LogEventHandler's fatal catch block
+ */
+ @Test
+ public void testBothSyncAndSafFailuresTriggersAbort() throws Exception {
+ final String tableName = "TBLBSSF";
+ final long commitId = 1L;
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+ // Poison every writer created by any ReplicationLog (SYNC or SAF) to fail
on sync
+ Answer<Object> poisonNewWriter = invocation -> {
+ LogFileWriter w = (LogFileWriter) invocation.callRealMethod();
+ doThrow(new IOException("Simulated sync failure")).when(w).sync();
+ return w;
+ };
+ Answer<Object> poisonLog = invocation -> {
+ ReplicationLog log = (ReplicationLog) invocation.callRealMethod();
+ doAnswer(poisonNewWriter).when(log).createNewWriter();
+ return log;
+ };
+ doAnswer(poisonLog).when(logGroup).createFallbackLog();
+
+ // Poison the already-initialized SYNC log
+ ReplicationLog activeLog = logGroup.getActiveLog();
+ LogFileWriter initialWriter = activeLog.getWriter();
+ doThrow(new IOException("Simulated sync
failure")).when(initialWriter).sync();
+ doAnswer(poisonNewWriter).when(activeLog).createNewWriter();
+
+ logGroup.append(tableName, commitId, put);
+ try {
+ logGroup.sync();
+ fail("Should have thrown RuntimeException from abort");
+ } catch (RuntimeException e) {
+ assertTrue("Abort message should mention both SYNC and SAF",
+ e.getMessage().contains("Both SYNC and SAF replication writes failed")
+ || e.getMessage().contains("ABORTING"));
+ }
+ }
+
+ /**
+ * Tests that when already in SAF mode and both SAF attempts fail,
+ * StoreAndForwardModeImpl.onFailure() triggers abort.
+ */
+ @Test
+ public void testSafBothAttemptsFailTriggersAbort() throws Exception {
+ final String tableName = "TBLSAFBA";
+ final long commitId = 1L;
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+ // Start in SAF mode
+ initialState = HAGroupState.ACTIVE_NOT_IN_SYNC;
+ storeRecord = new HAGroupStoreRecord(null, haGroupName, initialState, 0,
+ HighAvailabilityPolicy.FAILOVER.toString(), "peerZKUrl", "clusterUrl",
"peerClusterUrl",
+ localUri.toString(), peerUri.toString(), 0L);
+
doReturn(Optional.of(storeRecord)).when(haGroupStoreManager).getHAGroupStoreRecord(anyString());
+ recreateLogGroup();
+ assertEquals(STORE_AND_FORWARD, logGroup.getMode());
+
+ ReplicationLog activeLog = logGroup.getActiveLog();
+ LogFileWriter initialWriter = activeLog.getWriter();
+ assertNotNull("Initial writer should not be null", initialWriter);
+
+ // All SAF writers fail on sync
+ doThrow(new IOException("Simulated SAF sync
failure")).when(initialWriter).sync();
+ doAnswer(invocation -> {
+ LogFileWriter w = (LogFileWriter) invocation.callRealMethod();
+ doThrow(new IOException("Simulated SAF sync failure")).when(w).sync();
+ return w;
+ }).when(activeLog).createNewWriter();
+
+ logGroup.append(tableName, commitId, put);
+ try {
+ logGroup.sync();
+ fail("Should have thrown RuntimeException from abort");
+ } catch (RuntimeException e) {
+ assertTrue("Abort should fire from SAF failure path",
+ e.getMessage().contains("ABORTING") || e.getMessage().contains("got
error"));
+ }
+ }
+
+ /**
+ * Tests that calculateSyncTimeout derives from
hbase.regionserver.wal.sync.timeout and that the
+ * explicit phoenix override still wins.
+ */
+ @Test
+ public void testCalculateSyncTimeout() throws Exception {
+ // Remove the explicit override so calculateSyncTimeout is used
+ conf.unset(ReplicationLogGroup.REPLICATION_LOG_SYNC_TIMEOUT_KEY);
+
+ // Set the HBase WAL sync timeout to a known value
+ conf.setLong("hbase.regionserver.wal.sync.timeout", 120000L);
+ recreateLogGroup();
+
+ // syncTimeoutMs should be walSyncTimeout + zkTimeout
+ long expectedZkTimeout = conf.getLong("hbase.zookeeper.session.timeout",
90000);
+ long expected = 120000L + expectedZkTimeout;
+ assertEquals("syncTimeoutMs should derive from WAL sync timeout", expected,
+ logGroup.syncTimeoutMs);
+
+ // Now set the explicit Phoenix override — it should win
+ conf.setLong(ReplicationLogGroup.REPLICATION_LOG_SYNC_TIMEOUT_KEY, 5000L);
+ recreateLogGroup();
+ assertEquals("Explicit override should take precedence", 5000L,
logGroup.syncTimeoutMs);
}
}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterSyncTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterSyncTest.java
index 5f07d3b167..cd08ea5334 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterSyncTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterSyncTest.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.replication.log;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -69,8 +70,11 @@ public class LogFileWriterSyncTest {
// Create the writer instance to be tested
writer = new LogFileWriter();
- // Initialize the writer - this will call fs.create() and set up internal
writers
+ // Initialize the writer - this will call fs.create() and set up internal
writers.
+ // Init syncs the header to force HDFS block allocation; clear that from
invocation history
+ // so tests only verify sync behavior during append/sync operations.
writer.init(writerContext);
+ clearInvocations(internalOutput);
}
@After
@@ -205,6 +209,7 @@ public class LogFileWriterSyncTest {
new LogFileWriterContext(hflushConf).setFileSystem(hflushMockFs);
LogFileWriter hflushWriter = new LogFileWriter();
hflushWriter.init(hflushContext);
+ clearInvocations(hflushOutput);
try {
Mutation m1 = LogFileTestUtil.newPut("row1", 1L, 1);