This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch PHOENIX-7562-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature by this 
push:
     new 5f1c8433ab PHOENIX-7669 Provide more control over log file validation 
(#2231)
5f1c8433ab is described below

commit 5f1c8433abb4d3df8fcf763b8bfd2c0ae5f4649f
Author: Andrew Purtell <apurt...@apache.org>
AuthorDate: Mon Jul 21 09:42:21 2025 -0700

    PHOENIX-7669 Provide more control over log file validation (#2231)
---
 .../replication/log/InvalidLogHeaderException.java | 31 ++++++++
 .../log/InvalidLogTrailerException.java            | 30 ++++++++
 .../apache/phoenix/replication/log/LogFile.java    | 25 +++++--
 .../replication/log/LogFileFormatReader.java       | 19 +++--
 .../replication/log/LogFileFormatWriter.java       | 30 ++------
 .../phoenix/replication/log/LogFileHeader.java     | 51 ++++---------
 .../phoenix/replication/log/LogFileReader.java     |  3 +-
 .../replication/log/LogFileReaderContext.java      | 21 +++++-
 .../phoenix/replication/log/LogFileTrailer.java    | 57 +++++---------
 .../phoenix/replication/log/LogFileFormatTest.java | 87 ++++++++++++++++++++++
 .../phoenix/replication/log/LogFileWriterTest.java | 10 +++
 .../reader/ReplicationLogProcessorTest.java        |  5 +-
 12 files changed, 250 insertions(+), 119 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogHeaderException.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogHeaderException.java
new file mode 100644
index 0000000000..1b2284732d
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogHeaderException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.log;
+
+import java.io.IOException;
+
+/** Exception thrown when a log file has an invalid header. */
+public class InvalidLogHeaderException extends IOException {
+    private static final long serialVersionUID = 1L;
+
+    public InvalidLogHeaderException(String message) {
+        super(message);
+    }
+
+}
+
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogTrailerException.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogTrailerException.java
new file mode 100644
index 0000000000..61cfbd3d38
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogTrailerException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.log;
+
+import java.io.IOException;
+
+/** Exception thrown when a log file has an invalid trailer. */
+public class InvalidLogTrailerException extends IOException {
+    private static final long serialVersionUID = 1L;
+
+    public InvalidLogTrailerException(String message) {
+        super(message);
+    }
+
+}
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java
index 419313bd7e..f84d316234 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java
@@ -23,6 +23,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -417,21 +418,31 @@ public interface LogFile {
 
     /**
      * Utility for determining if a file is a valid replication log file.
+     * @param conf The Configuration
      * @param fs The FileSystem
      * @param path Path to the potential replication log file
+     * @param validateTrailer Whether to validate the trailer
      * @return true if the file is a valid replication log file, false 
otherwise
      * @throws IOException if an I/O problem was encountered
      */
-    static boolean isValidLogFile(final FileSystem fs, final Path path) throws 
IOException {
+    static boolean isValidLogFile(final Configuration conf, final FileSystem 
fs, final Path path,
+            final boolean validateTrailer) throws IOException {
         long length = fs.getFileStatus(path).getLen();
         try (FSDataInputStream in = fs.open(path)) {
-            if (LogFileTrailer.isValidTrailer(in, length)) {
-                return true;
-            } else {
-                // Not a valid trailer, do we need to do something (set a 
flag)?
-                // Fall back to checking the header.
-                return LogFileHeader.isValidHeader(in);
+            // Check if the file is too short to be a valid log file.
+            if (length < LogFileHeader.HEADERSIZE) {
+                return false;
             }
+            try (LogFileFormatReader reader = new LogFileFormatReader()) {
+                LogFileReaderContext context = new LogFileReaderContext(conf)
+                    .setFilePath(path)
+                    .setFileSize(length)
+                    .setValidateTrailer(validateTrailer);
+                reader.init(context, (SeekableDataInput) in);
+            } catch (InvalidLogHeaderException | InvalidLogTrailerException e) 
{
+                return false;
+            }
+            return true;
         }
     }
 
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatReader.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatReader.java
index 194abe7f87..9d0f17a2f2 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatReader.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatReader.java
@@ -48,7 +48,6 @@ public class LogFileFormatReader implements Closeable {
     private ByteBuffer currentBlockBuffer;
     private long currentBlockDataBytes;
     private long currentBlockConsumedBytes;
-    private boolean trailerValidated;
     private CRC64 crc = new CRC64();
 
     public LogFileFormatReader() {
@@ -62,12 +61,14 @@ public class LogFileFormatReader implements Closeable {
         this.currentBlockConsumedBytes = 0;
         try {
             readAndValidateTrailer();
-            trailerValidated = true;
         } catch (IOException e) {
+            // If we are validating the trailer, we cannot proceed without it.
+            if (context.isValidateTrailer()) {
+                throw e;
+            }
             // Log warning, trailer might be missing or corrupt, proceed 
without it
-            LOG.warn("Failed to read or validate Log trailer for path: "
-                + (context != null ? context.getFilePath() : "unknown")
-                + ". Proceeding without trailer.", e);
+            LOG.warn("Failed to validate Log trailer for " + 
context.getFilePath()
+                + ", proceeding", e);
             trailer = null; // Ensure trailer is null if reading/validation 
failed
         }
         this.decoder = null;
@@ -78,8 +79,7 @@ public class LogFileFormatReader implements Closeable {
 
     private void readAndValidateTrailer() throws IOException {
         if (context.getFileSize() < LogFileTrailer.FIXED_TRAILER_SIZE) {
-            throw new IOException("File size " + context.getFileSize()
-                + " is smaller than the fixed trailer size " + 
LogFileTrailer.FIXED_TRAILER_SIZE);
+            throw new InvalidLogTrailerException("Short file");
         }
         LogFileTrailer ourTrailer = new LogFileTrailer();
         // Fixed trailer fields will be LogTrailer.FIXED_TRAILER_SIZE bytes 
back from end of file.
@@ -343,7 +343,7 @@ public class LogFileFormatReader implements Closeable {
 
     // Validates read counts against trailer counts if trailer was 
successfully read
     private void validateReadCounts() {
-        if (!trailerValidated || trailer == null) {
+        if (trailer == null) {
             return;
         }
         if (trailer.getBlockCount() != context.getBlocksRead()) {
@@ -374,8 +374,7 @@ public class LogFileFormatReader implements Closeable {
             + input + ", header=" + header + ", trailer=" + trailer + ", 
currentPosition="
             + currentPosition + ", currentBlockBuffer=" + currentBlockBuffer
             + ", currentBlockUncompressedSize=" + currentBlockDataBytes
-            + ", currentBlockConsumedBytes=" + currentBlockConsumedBytes
-            + ", trailerValidated=" + trailerValidated + "]";
+            + ", currentBlockConsumedBytes=" + currentBlockConsumedBytes + "]";
     }
 
     LogFile.Header getHeader() {
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
index 7e7f3c1931..d2778478ea 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
@@ -41,8 +41,6 @@ public class LogFileFormatWriter implements Closeable {
     private SyncableDataOutput output;
     private ByteArrayOutputStream currentBlockBytes;
     private DataOutputStream blockDataStream;
-    private boolean headerWritten = false;
-    private boolean trailerWritten = false;
     private long recordCount = 0;
     private long blockCount = 0;
     private long blocksStartOffset = -1;
@@ -60,15 +58,14 @@ public class LogFileFormatWriter implements Closeable {
         this.currentBlockBytes = new ByteArrayOutputStream();
         this.blockDataStream = new DataOutputStream(currentBlockBytes);
         this.encoder = context.getCodec().getEncoder(blockDataStream);
+        // Write header immediately when file is created
+        writeFileHeader();
     }
 
     private void writeFileHeader() throws IOException {
-        if (!headerWritten) {
-            LogFileHeader header = new LogFileHeader();
-            header.write(output);
-            blocksStartOffset = output.getPos(); // First block starts after 
header
-            headerWritten = true;
-        }
+        LogFileHeader header = new LogFileHeader();
+        header.write(output);
+        blocksStartOffset = output.getPos(); // First block starts after header
     }
 
     public long getBlocksStartOffset() {
@@ -76,13 +73,6 @@ public class LogFileFormatWriter implements Closeable {
     }
 
     public void append(LogFile.Record record) throws IOException {
-        if (!headerWritten) {
-            // Lazily write file header
-            writeFileHeader();
-        }
-        if (trailerWritten) {
-            throw new IOException("Cannot append record after trailer has been 
written");
-        }
         if (blockDataStream == null) {
             startBlock(); // Start the block if needed
         }
@@ -188,15 +178,10 @@ public class LogFileFormatWriter implements Closeable {
 
     @Override
     public void close() throws IOException {
-        // We use the fact we have already written the trailer as the boolean 
"closed" condition.
-        if (trailerWritten) {
+        if (output == null) {
             return;
         }
         try {
-            // We might be closing an empty file, handle this case correctly.
-            if (!headerWritten) {
-                writeFileHeader();
-            }
             // Close any outstanding block.
             closeBlock();
             // After we write the trailer we consider the file closed.
@@ -209,6 +194,7 @@ public class LogFileFormatWriter implements Closeable {
                 } catch (IOException e) {
                     LOG.error("Exception while closing LogFormatWriter", e);
                 }
+                output = null;
             }
         }
     }
@@ -220,7 +206,6 @@ public class LogFileFormatWriter implements Closeable {
             .setBlocksStartOffset(blocksStartOffset)
             .setTrailerStartOffset(output.getPos());
         trailer.write(output);
-        trailerWritten = true;
         try {
             output.sync();
         } catch (IOException e) {
@@ -233,7 +218,6 @@ public class LogFileFormatWriter implements Closeable {
     public String toString() {
         return "LogFileFormatWriter [writerContext=" + context
             + ", currentBlockUncompressedBytes=" + currentBlockBytes
-            + ", headerWritten=" + headerWritten + ", trailerWritten=" + 
trailerWritten
             + ", recordCount=" + recordCount + ", blockCount=" + blockCount
             + ", blocksStartOffset=" + blocksStartOffset + "]";
     }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileHeader.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileHeader.java
index bfc07b76d9..4fc85edabf 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileHeader.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileHeader.java
@@ -19,12 +19,10 @@ package org.apache.phoenix.replication.log;
 
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.EOFException;
 import java.io.IOException;
 import java.util.Arrays;
 
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.util.Bytes;
 
 public class LogFileHeader implements LogFile.Header {
@@ -36,7 +34,7 @@ public class LogFileHeader implements LogFile.Header {
     /** Current minor version of the replication log format */
     static final int VERSION_MINOR = 0;
 
-    static final int HEADERSIZE = MAGIC.length + 3 * Bytes.SIZEOF_BYTE;
+    static final int HEADERSIZE = MAGIC.length + 2 * Bytes.SIZEOF_BYTE;
 
     private int majorVersion = VERSION_MAJOR;
     private int minorVersion = VERSION_MINOR;
@@ -70,17 +68,25 @@ public class LogFileHeader implements LogFile.Header {
     @Override
     public void readFields(DataInput in) throws IOException {
         byte[] magic = new byte[MAGIC.length];
-        in.readFully(magic);
+        try {
+            in.readFully(magic);
+        } catch (EOFException e) {
+            throw (IOException) new InvalidLogHeaderException("Short 
magic").initCause(e);
+        }
         if (!Arrays.equals(MAGIC, magic)) {
-            throw new IOException("Invalid LogFile magic. Got " + 
Bytes.toStringBinary(magic)
+            throw new InvalidLogHeaderException("Bad magic. Got " + 
Bytes.toStringBinary(magic)
                 + ", expected " + Bytes.toStringBinary(MAGIC));
         }
-        majorVersion = in.readByte();
-        minorVersion = in.readByte();
+        try {
+            majorVersion = in.readByte();
+            minorVersion = in.readByte();
+        } catch (EOFException e) {
+            throw (IOException) new InvalidLogHeaderException("Short 
version").initCause(e);
+        }
         // Basic version check for now. We assume semver conventions where 
only higher major
         // versions may be incompatible.
         if (majorVersion > VERSION_MAJOR) {
-            throw new IOException("Unsupported LogFile version. Got major=" + 
majorVersion
+            throw new InvalidLogHeaderException("Unsupported version. Got 
major=" + majorVersion
                 + " minor=" + minorVersion + ", expected major=" + 
VERSION_MAJOR
                 + " minor=" + VERSION_MINOR);
         }
@@ -98,33 +104,6 @@ public class LogFileHeader implements LogFile.Header {
         return HEADERSIZE;
     }
 
-    public static boolean isValidHeader(final FileSystem fs, final Path path)
-            throws IOException {
-        if (fs.getFileStatus(path).getLen() < HEADERSIZE) {
-            return false;
-        }
-        try (FSDataInputStream in = fs.open(path)) {
-            return isValidHeader(in);
-        }
-    }
-
-    public static boolean isValidHeader(FSDataInputStream in) throws 
IOException {
-        in.seek(0);
-        byte[] magic = new byte[MAGIC.length];
-        in.readFully(magic);
-        if (!Arrays.equals(MAGIC, magic)) {
-            return false;
-        }
-        int majorVersion = in.readByte();
-        in.readByte(); // minorVersion, for now we don't use it
-        // Basic version check for now. We assume semver conventions where 
only higher major
-        // versions may be incompatible.
-        if (majorVersion > VERSION_MAJOR) {
-            return false;
-        }
-        return true;
-    }
-
     @Override
     public String toString() {
         return "LogFileHeader [majorVersion=" + majorVersion + ", 
minorVersion=" + minorVersion
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReader.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReader.java
index 6b3fa028b2..1961922b7a 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReader.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReader.java
@@ -137,7 +137,8 @@ public class LogFileReader implements LogFile.Reader  {
             throw e;
         } finally {
             closed = true;
-            LOG.debug("Closed LogFileReader for path {}", 
context.getFilePath());
+            LOG.debug("Closed LogFileReader for path {}", context != null ? 
context.getFilePath()
+                : "null");
         }
     }
 
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReaderContext.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReaderContext.java
index 52fbdb0cbb..a2c6de3488 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReaderContext.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReaderContext.java
@@ -36,12 +36,17 @@ public class LogFileReaderContext {
     /** Default for skipping corrupt blocks */
     public static final boolean DEFAULT_LOGFILE_SKIP_CORRUPT_BLOCKS = true;
 
+    public static final String LOGFILE_VALIDATE_TRAILER =
+        "phoenix.replication.logfile.validate.trailer";
+    public static final boolean DEFAULT_LOGFILE_VALIDATE_TRAILER = true;
+
     private final Configuration conf;
     private FileSystem fs;
     private Path path;
     private LogFileCodec codec;
     private long fileSize = -1;
     private boolean isSkipCorruptBlocks;
+    private boolean isValidateTrailer;
     private long blocksRead;
     private long recordsRead;
     private long corruptBlocksSkipped;
@@ -50,6 +55,8 @@ public class LogFileReaderContext {
         this.conf = conf;
         this.isSkipCorruptBlocks = conf.getBoolean(LOGFILE_SKIP_CORRUPT_BLOCKS,
             DEFAULT_LOGFILE_SKIP_CORRUPT_BLOCKS);
+        this.isValidateTrailer = conf.getBoolean(LOGFILE_VALIDATE_TRAILER,
+            DEFAULT_LOGFILE_VALIDATE_TRAILER);
         // Note: When we have multiple codec types, instantiate the 
appropriate type based on
         // configuration;
         this.codec = new LogFileCodec();
@@ -146,11 +153,21 @@ public class LogFileReaderContext {
         return this;
     }
 
+    public boolean isValidateTrailer() {
+        return isValidateTrailer;
+    }
+
+    public LogFileReaderContext setValidateTrailer(boolean validateTrailer) {
+        this.isValidateTrailer = validateTrailer;
+        return this;
+    }
+
     @Override
     public String toString() {
         return "LogFileReaderContext [filePath=" + path + ", fileSize=" + 
fileSize
-            + ", isSkipCorruptBlocks=" + isSkipCorruptBlocks + ", codec=" + 
codec + ", blocksRead="
-            + blocksRead + ", recordsRead=" + recordsRead + ", 
corruptBlocksSkipped="
+            + ", isSkipCorruptBlocks=" + isSkipCorruptBlocks + ", 
isValidateTrailer="
+            + isValidateTrailer + ", codec=" + codec + ", blocksRead=" + 
blocksRead
+            + ", recordsRead=" + recordsRead + ", corruptBlocksSkipped="
             + corruptBlocksSkipped + "]";
     }
 
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileTrailer.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileTrailer.java
index b05da86aae..8e29bf288a 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileTrailer.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileTrailer.java
@@ -19,12 +19,10 @@ package org.apache.phoenix.replication.log;
 
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.EOFException;
 import java.io.IOException;
 import java.util.Arrays;
 
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.util.Bytes;
 
 public class LogFileTrailer implements LogFile.Trailer {
@@ -113,23 +111,31 @@ public class LogFileTrailer implements LogFile.Trailer {
     }
 
     public void readFixedFields(DataInput in) throws IOException {
-        this.recordCount = in.readLong();
-        this.blockCount = in.readLong();
-        this.blocksStartOffset = in.readLong();
-        this.trailerStartOffset = in.readLong();
-        this.majorVersion = in.readByte();
-        this.minorVersion = in.readByte();
+        try {
+            this.recordCount = in.readLong();
+            this.blockCount = in.readLong();
+            this.blocksStartOffset = in.readLong();
+            this.trailerStartOffset = in.readLong();
+            this.majorVersion = in.readByte();
+            this.minorVersion = in.readByte();
+        } catch (EOFException e) {
+            throw (IOException) new InvalidLogTrailerException("Short fixed 
fields").initCause(e);
+        }
         // Basic version check for now. We assume semver conventions where 
only higher major
         // versions may be incompatible.
         if (majorVersion > LogFileHeader.VERSION_MAJOR) {
-            throw new IOException("Unsupported LogFile version. Got major=" + 
majorVersion
+            throw new InvalidLogTrailerException("Unsupported version. Got 
major=" + majorVersion
                 + " minor=" + minorVersion + ", expected major=" + 
LogFileHeader.VERSION_MAJOR
                 + " minor=" + LogFileHeader.VERSION_MINOR);
         }
         byte[] magic = new byte[LogFileHeader.MAGIC.length];
-        in.readFully(magic);
+        try {
+            in.readFully(magic);
+        } catch (EOFException e) {
+            throw (IOException) new InvalidLogTrailerException("Short 
magic").initCause(e);
+        }
         if (!Arrays.equals(LogFileHeader.MAGIC, magic)) {
-            throw new IOException("Invalid LogFile magic. Got " + 
Bytes.toStringBinary(magic)
+            throw new InvalidLogTrailerException("Bad magic. Got " + 
Bytes.toStringBinary(magic)
                 + ", expected " + Bytes.toStringBinary(LogFileHeader.MAGIC));
         }
     }
@@ -172,33 +178,6 @@ public class LogFileTrailer implements LogFile.Trailer {
             + FIXED_TRAILER_SIZE;
     }
 
-    public static boolean isValidTrailer(final FileSystem fs, final Path path) 
throws IOException {
-        try (FSDataInputStream in = fs.open(path)) {
-            return isValidTrailer(in, fs.getFileStatus(path).getLen());
-        }
-    }
-
-    public static boolean isValidTrailer(FSDataInputStream in, long length) 
throws IOException {
-        long offset = length - VERSION_AND_MAGIC_SIZE;
-        if (offset < 0) {
-            return false;
-        }
-        in.seek(offset);
-        byte[] magic = new byte[LogFileHeader.MAGIC.length];
-        in.readFully(magic);
-        if (!Arrays.equals(LogFileHeader.MAGIC, magic)) {
-            return false;
-        }
-        int majorVersion = in.readByte();
-        in.readByte(); // minorVersion, for now we don't use it
-        // Basic version check for now. We assume semver conventions where 
only higher major
-        // versions may be incompatible.
-        if (majorVersion > LogFileHeader.VERSION_MAJOR) {
-            return false;
-        }
-        return true;
-    }
-
     @Override
     public String toString() {
         return "LogFileTrailer [majorVersion=" + majorVersion + ", 
minorVersion=" + minorVersion
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java
index 875cb6ea1b..f623fb9f0a 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java
@@ -296,6 +296,9 @@ public class LogFileFormatTest {
         byte[] data = writerBaos.toByteArray();
         byte[] truncatedData = Arrays.copyOf(data, (int) truncationPoint);
 
+        // We truncated the final block, so the trailer is gone too.
+        readerContext.setValidateTrailer(false);
+
         initLogFileReader(truncatedData);
 
         List<LogFile.Record> decoded = new ArrayList<>();
@@ -342,6 +345,7 @@ public class LogFileFormatTest {
         LogFileTestUtil.SeekableByteArrayInputStream input =
             new LogFileTestUtil.SeekableByteArrayInputStream(truncatedData);
         readerContext.setFileSize(truncatedData.length);
+        readerContext.setValidateTrailer(false);
         // This init should log a warning but succeed
         reader.init(readerContext, input);
 
@@ -377,6 +381,7 @@ public class LogFileFormatTest {
         LogFileTestUtil.SeekableByteArrayInputStream input =
             new LogFileTestUtil.SeekableByteArrayInputStream(truncatedData);
         readerContext.setFileSize(truncatedData.length);
+        readerContext.setValidateTrailer(false);
         // Init should log a warning but succeed by ignoring the trailer
         reader.init(readerContext, input);
 
@@ -406,6 +411,88 @@ public class LogFileFormatTest {
         assertEquals("Records read count mismatch", totalRecords, 
readerContext.getRecordsRead());
     }
 
+    @Test
+    public void testFailIfMissingHeader() throws IOException {
+        // Zero length file
+        byte[] data = new byte[0];
+        LogFileTestUtil.SeekableByteArrayInputStream input =
+            new LogFileTestUtil.SeekableByteArrayInputStream(data);
+        readerContext.setFileSize(data.length);
+        readerContext.setValidateTrailer(false);
+        try {
+            reader.init(readerContext, input);
+            fail("Expected InvalidLogHeaderException for zero length file");
+        } catch (InvalidLogHeaderException e) {
+            assertTrue("Exception message should contain 'Short magic'",
+                e.getMessage().contains("Short magic"));
+        }
+    }
+
+    @Test
+    public void testFailIfInvalidHeader() throws IOException {
+        initLogFileWriter();
+        writer.close(); // Writes valid trailer
+        byte[] data = writerBaos.toByteArray();
+        LogFileTestUtil.SeekableByteArrayInputStream input =
+            new LogFileTestUtil.SeekableByteArrayInputStream(data);
+        readerContext.setFileSize(data.length);
+        readerContext.setValidateTrailer(true);
+        data[0] = (byte) 'X'; // Corrupt the first magic byte
+        try {
+            reader.init(readerContext, input);
+            fail("Expected InvalidLogHeaderException for file with corrupted 
header magic");
+        } catch (InvalidLogHeaderException e) {
+            assertTrue("Exception message should contain 'Bad magic'",
+                e.getMessage().contains("Bad magic"));
+        }
+    }
+
+    @Test
+    public void testFailIfMissingTrailer() throws IOException {
+        initLogFileWriter();
+        writeBlock(writer, "B1", 0, 5);
+        // Don't close the writer, simulate missing trailer
+        byte[] data = writerBaos.toByteArray();
+        // Re-initialize reader with truncated data and trailer validation 
enabled
+        LogFileTestUtil.SeekableByteArrayInputStream input =
+            new LogFileTestUtil.SeekableByteArrayInputStream(data);
+        readerContext.setFileSize(data.length);
+        // Enable trailer validation
+        readerContext.setValidateTrailer(true);
+        try {
+            reader.init(readerContext, input);
+            fail("Expected InvalidLogTrailerException when trailer is 
missing");
+        } catch (InvalidLogTrailerException e) {
+            assertTrue("Exception message should contain 'Unsupported 
version'",
+                e.getMessage().contains("Unsupported version"));
+        }
+    }
+
+    @Test
+    public void testFailIfInvalidTrailer() throws IOException {
+        initLogFileWriter();
+        writeBlock(writer, "B1", 0, 5);
+        writer.close(); // Writes valid trailer
+        byte[] data = writerBaos.toByteArray();
+        // Corrupt the trailer by changing the magic bytes
+        int trailerStartOffset = data.length - 
LogFileTrailer.FIXED_TRAILER_SIZE;
+        int magicOffset = trailerStartOffset + 
LogFileTrailer.FIXED_TRAILER_SIZE
+            - LogFileHeader.MAGIC.length;
+        data[magicOffset] = (byte) 'X'; // Corrupt the first magic byte
+        // Re-initialize reader with corrupted trailer and trailer validation 
enabled
+        LogFileTestUtil.SeekableByteArrayInputStream input =
+            new LogFileTestUtil.SeekableByteArrayInputStream(data);
+        readerContext.setFileSize(data.length);
+        readerContext.setValidateTrailer(true);
+        try {
+            reader.init(readerContext, input);
+            fail("Expected InvalidLogTrailerException when trailer magic is 
corrupt");
+        } catch (InvalidLogTrailerException e) {
+            assertTrue("Exception message should contain 'Bad magic'",
+                e.getMessage().contains("Bad magic"));
+        }
+    }
+
     @Test
     public void testLogFileCorruptionFirstBlockChecksum() throws IOException {
         initLogFileWriter();
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterTest.java
index 55c4737223..b4e828a3b6 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterTest.java
@@ -137,6 +137,16 @@ public class LogFileWriterTest {
         reader.close();
     }
 
+    @Test
+    public void testHeaderWrittenImmediately() throws IOException {
+        // This should write header immediately
+        initLogFileWriter();
+        // Verify file exists and has content (header should be written)
+        assertTrue("File should exist after init", localFs.exists(filePath));
+        assertEquals("File should have header written", 
LogFileHeader.HEADERSIZE, writer.getLength());
+        writer.close();
+    }
+
     private void initLogFileReader() throws IOException {
         readerContext = new LogFileReaderContext(conf).setFileSystem(localFs)
             .setFilePath(filePath);
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java
index e97cd027e9..2bbc18a35d 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java
@@ -500,7 +500,10 @@ public class ReplicationLogProcessorTest extends 
ParallelStatsDisabledIT {
         writer.append(tableNameString, 1, put);
         writer.sync();
 
-        ReplicationLogProcessor spyProcessor = Mockito.spy(new 
ReplicationLogProcessor(conf, testHAGroupName));
+        // For processing of an unclosed file to work, we need to disable 
trailer validation
+        Configuration testConf = new Configuration(conf);
+        testConf.setBoolean(LogFileReaderContext.LOGFILE_VALIDATE_TRAILER, 
false);
+        ReplicationLogProcessor spyProcessor = Mockito.spy(new 
ReplicationLogProcessor(testConf, testHAGroupName));
 
         // Create argument captor to capture the actual parameters passed to 
processReplicationLogBatch
         ArgumentCaptor<Map<TableName, List<Mutation>>> mapCaptor =

Reply via email to