This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by 
this push:
     new 755a47a7cf PHOENIX-7669 Enhance Header and Trailer validations to 
gracefully handle unclosed files (#2341)
755a47a7cf is described below

commit 755a47a7cf4d1f0cf68029be54979d36a36835e5
Author: Himanshu Gwalani <[email protected]>
AuthorDate: Sat Jan 10 05:58:12 2026 +0530

    PHOENIX-7669 Enhance Header and Trailer validations to gracefully handle 
unclosed files (#2341)
    
    Co-authored-by: Himanshu Gwalani 
<[email protected]>
---
 .../replication/log/InvalidLogHeaderException.java | 30 ++++++++
 .../log/InvalidLogTrailerException.java            | 30 ++++++++
 .../apache/phoenix/replication/log/LogFile.java    | 27 ++++---
 .../replication/log/LogFileFormatReader.java       | 19 ++---
 .../replication/log/LogFileFormatWriter.java       | 32 ++------
 .../phoenix/replication/log/LogFileHeader.java     | 53 +++++--------
 .../phoenix/replication/log/LogFileReader.java     |  3 +-
 .../replication/log/LogFileReaderContext.java      | 22 +++++-
 .../phoenix/replication/log/LogFileTrailer.java    | 59 +++++----------
 .../reader/ReplicationLogProcessorTestIT.java      |  5 +-
 .../phoenix/replication/log/LogFileFormatTest.java | 87 ++++++++++++++++++++++
 .../phoenix/replication/log/LogFileWriterTest.java | 10 +++
 12 files changed, 252 insertions(+), 125 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogHeaderException.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogHeaderException.java
new file mode 100644
index 0000000000..500b8b003f
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogHeaderException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.log;
+
+import java.io.IOException;
+
+/** Exception thrown when a log file has an invalid header. */
+public class InvalidLogHeaderException extends IOException {
+  private static final long serialVersionUID = 1L;
+
+  public InvalidLogHeaderException(String message) {
+    super(message);
+  }
+
+}
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogTrailerException.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogTrailerException.java
new file mode 100644
index 0000000000..4b528d1be9
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogTrailerException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.log;
+
+import java.io.IOException;
+
+/** Exception thrown when a log file has an invalid trailer. */
+public class InvalidLogTrailerException extends IOException {
+  private static final long serialVersionUID = 1L;
+
+  public InvalidLogTrailerException(String message) {
+    super(message);
+  }
+
+}
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java
index 631146c4aa..b1f05f4af9 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java
@@ -22,6 +22,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -418,21 +419,29 @@ public interface LogFile {
 
   /**
    * Utility for determining if a file is a valid replication log file.
-   * @param fs   The FileSystem
-   * @param path Path to the potential replication log file
+   * @param conf            The Configuration
+   * @param fs              The FileSystem
+   * @param path            Path to the potential replication log file
+   * @param validateTrailer Whether to validate the trailer
    * @return true if the file is a valid replication log file, false otherwise
    * @throws IOException if an I/O problem was encountered
    */
-  static boolean isValidLogFile(final FileSystem fs, final Path path) throws 
IOException {
+  static boolean isValidLogFile(final Configuration conf, final FileSystem fs, 
final Path path,
+    final boolean validateTrailer) throws IOException {
     long length = fs.getFileStatus(path).getLen();
     try (FSDataInputStream in = fs.open(path)) {
-      if (LogFileTrailer.isValidTrailer(in, length)) {
-        return true;
-      } else {
-        // Not a valid trailer, do we need to do something (set a flag)?
-        // Fall back to checking the header.
-        return LogFileHeader.isValidHeader(in);
+      // Check if the file is too short to be a valid log file.
+      if (length < LogFileHeader.HEADERSIZE) {
+        return false;
       }
+      try (LogFileFormatReader reader = new LogFileFormatReader()) {
+        LogFileReaderContext context = new 
LogFileReaderContext(conf).setFilePath(path)
+          .setFileSize(length).setValidateTrailer(validateTrailer);
+        reader.init(context, (SeekableDataInput) in);
+      } catch (InvalidLogHeaderException | InvalidLogTrailerException e) {
+        return false;
+      }
+      return true;
     }
   }
 
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatReader.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatReader.java
index 2350ba7c7a..6fbdf6d645 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatReader.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatReader.java
@@ -47,7 +47,6 @@ public class LogFileFormatReader implements Closeable {
   private ByteBuffer currentBlockBuffer;
   private long currentBlockDataBytes;
   private long currentBlockConsumedBytes;
-  private boolean trailerValidated;
   private CRC64 crc = new CRC64();
 
   public LogFileFormatReader() {
@@ -61,13 +60,13 @@ public class LogFileFormatReader implements Closeable {
     this.currentBlockConsumedBytes = 0;
     try {
       readAndValidateTrailer();
-      trailerValidated = true;
     } catch (IOException e) {
+      // If we are validating the trailer, we cannot proceed without it.
+      if (context.isValidateTrailer()) {
+        throw e;
+      }
       // Log warning, trailer might be missing or corrupt, proceed without it
-      LOG.warn(
-        "Failed to read or validate Log trailer for path: "
-          + (context != null ? context.getFilePath() : "unknown") + ". 
Proceeding without trailer.",
-        e);
+      LOG.warn("Failed to validate Log trailer for " + context.getFilePath() + 
", proceeding", e);
       trailer = null; // Ensure trailer is null if reading/validation failed
     }
     this.decoder = null;
@@ -78,8 +77,7 @@ public class LogFileFormatReader implements Closeable {
 
   private void readAndValidateTrailer() throws IOException {
     if (context.getFileSize() < LogFileTrailer.FIXED_TRAILER_SIZE) {
-      throw new IOException("File size " + context.getFileSize()
-        + " is smaller than the fixed trailer size " + 
LogFileTrailer.FIXED_TRAILER_SIZE);
+      throw new InvalidLogTrailerException("Short file");
     }
     LogFileTrailer ourTrailer = new LogFileTrailer();
     // Fixed trailer fields will be LogTrailer.FIXED_TRAILER_SIZE bytes back 
from end of file.
@@ -337,7 +335,7 @@ public class LogFileFormatReader implements Closeable {
 
   // Validates read counts against trailer counts if trailer was successfully 
read
   private void validateReadCounts() {
-    if (!trailerValidated || trailer == null) {
+    if (trailer == null) {
       return;
     }
     if (trailer.getBlockCount() != context.getBlocksRead()) {
@@ -367,8 +365,7 @@ public class LogFileFormatReader implements Closeable {
     return "LogFileFormatReader [context=" + context + ", decoder=" + decoder 
+ ", input=" + input
       + ", header=" + header + ", trailer=" + trailer + ", currentPosition=" + 
currentPosition
       + ", currentBlockBuffer=" + currentBlockBuffer + ", 
currentBlockUncompressedSize="
-      + currentBlockDataBytes + ", currentBlockConsumedBytes=" + 
currentBlockConsumedBytes
-      + ", trailerValidated=" + trailerValidated + "]";
+      + currentBlockDataBytes + ", currentBlockConsumedBytes=" + 
currentBlockConsumedBytes + "]";
   }
 
   LogFile.Header getHeader() {
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
index 30fab50bf4..be94364a80 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
@@ -40,8 +40,6 @@ public class LogFileFormatWriter implements Closeable {
   private SyncableDataOutput output;
   private ByteArrayOutputStream currentBlockBytes;
   private DataOutputStream blockDataStream;
-  private boolean headerWritten = false;
-  private boolean trailerWritten = false;
   private long recordCount = 0;
   private long blockCount = 0;
   private long blocksStartOffset = -1;
@@ -59,15 +57,14 @@ public class LogFileFormatWriter implements Closeable {
     this.currentBlockBytes = new ByteArrayOutputStream();
     this.blockDataStream = new DataOutputStream(currentBlockBytes);
     this.encoder = context.getCodec().getEncoder(blockDataStream);
+    // Write header immediately when file is created
+    writeFileHeader();
   }
 
   private void writeFileHeader() throws IOException {
-    if (!headerWritten) {
-      LogFileHeader header = new LogFileHeader();
-      header.write(output);
-      blocksStartOffset = output.getPos(); // First block starts after header
-      headerWritten = true;
-    }
+    LogFileHeader header = new LogFileHeader();
+    header.write(output);
+    blocksStartOffset = output.getPos(); // First block starts after header
   }
 
   public long getBlocksStartOffset() {
@@ -75,13 +72,6 @@ public class LogFileFormatWriter implements Closeable {
   }
 
   public void append(LogFile.Record record) throws IOException {
-    if (!headerWritten) {
-      // Lazily write file header
-      writeFileHeader();
-    }
-    if (trailerWritten) {
-      throw new IOException("Cannot append record after trailer has been 
written");
-    }
     if (blockDataStream == null) {
       startBlock(); // Start the block if needed
     }
@@ -185,15 +175,10 @@ public class LogFileFormatWriter implements Closeable {
 
   @Override
   public void close() throws IOException {
-    // We use the fact we have already written the trailer as the boolean 
"closed" condition.
-    if (trailerWritten) {
+    if (output == null) {
       return;
     }
     try {
-      // We might be closing an empty file, handle this case correctly.
-      if (!headerWritten) {
-        writeFileHeader();
-      }
       // Close any outstanding block.
       closeBlock();
       // After we write the trailer we consider the file closed.
@@ -206,6 +191,7 @@ public class LogFileFormatWriter implements Closeable {
         } catch (IOException e) {
           LOG.error("Exception while closing LogFormatWriter", e);
         }
+        output = null;
       }
     }
   }
@@ -215,7 +201,6 @@ public class LogFileFormatWriter implements Closeable {
       new 
LogFileTrailer().setRecordCount(recordCount).setBlockCount(blockCount)
         
.setBlocksStartOffset(blocksStartOffset).setTrailerStartOffset(output.getPos());
     trailer.write(output);
-    trailerWritten = true;
     try {
       output.sync();
     } catch (IOException e) {
@@ -227,8 +212,7 @@ public class LogFileFormatWriter implements Closeable {
   @Override
   public String toString() {
     return "LogFileFormatWriter [writerContext=" + context + ", 
currentBlockUncompressedBytes="
-      + currentBlockBytes + ", headerWritten=" + headerWritten + ", 
trailerWritten="
-      + trailerWritten + ", recordCount=" + recordCount + ", blockCount=" + 
blockCount
+      + currentBlockBytes + ", recordCount=" + recordCount + ", blockCount=" + 
blockCount
       + ", blocksStartOffset=" + blocksStartOffset + "]";
   }
 
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileHeader.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileHeader.java
index c15260b3a5..fdaced70d3 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileHeader.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileHeader.java
@@ -19,11 +19,9 @@ package org.apache.phoenix.replication.log;
 
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.EOFException;
 import java.io.IOException;
 import java.util.Arrays;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.util.Bytes;
 
 public class LogFileHeader implements LogFile.Header {
@@ -35,7 +33,7 @@ public class LogFileHeader implements LogFile.Header {
   /** Current minor version of the replication log format */
   static final int VERSION_MINOR = 0;
 
-  static final int HEADERSIZE = MAGIC.length + 3 * Bytes.SIZEOF_BYTE;
+  static final int HEADERSIZE = MAGIC.length + 2 * Bytes.SIZEOF_BYTE;
 
   private int majorVersion = VERSION_MAJOR;
   private int minorVersion = VERSION_MINOR;
@@ -69,18 +67,27 @@ public class LogFileHeader implements LogFile.Header {
   @Override
   public void readFields(DataInput in) throws IOException {
     byte[] magic = new byte[MAGIC.length];
-    in.readFully(magic);
+    try {
+      in.readFully(magic);
+    } catch (EOFException e) {
+      throw (IOException) new InvalidLogHeaderException("Short 
magic").initCause(e);
+    }
     if (!Arrays.equals(MAGIC, magic)) {
-      throw new IOException("Invalid LogFile magic. Got " + 
Bytes.toStringBinary(magic)
+      throw new InvalidLogHeaderException("Bad magic. Got " + 
Bytes.toStringBinary(magic)
         + ", expected " + Bytes.toStringBinary(MAGIC));
     }
-    majorVersion = in.readByte();
-    minorVersion = in.readByte();
+    try {
+      majorVersion = in.readByte();
+      minorVersion = in.readByte();
+    } catch (EOFException e) {
+      throw (IOException) new InvalidLogHeaderException("Short 
version").initCause(e);
+    }
     // Basic version check for now. We assume semver conventions where only 
higher major
     // versions may be incompatible.
     if (majorVersion > VERSION_MAJOR) {
-      throw new IOException("Unsupported LogFile version. Got major=" + 
majorVersion + " minor="
-        + minorVersion + ", expected major=" + VERSION_MAJOR + " minor=" + 
VERSION_MINOR);
+      throw new InvalidLogHeaderException(
+        "Unsupported version. Got major=" + majorVersion + " minor=" + 
minorVersion
+          + ", expected major=" + VERSION_MAJOR + " minor=" + VERSION_MINOR);
     }
   }
 
@@ -96,32 +103,6 @@ public class LogFileHeader implements LogFile.Header {
     return HEADERSIZE;
   }
 
-  public static boolean isValidHeader(final FileSystem fs, final Path path) 
throws IOException {
-    if (fs.getFileStatus(path).getLen() < HEADERSIZE) {
-      return false;
-    }
-    try (FSDataInputStream in = fs.open(path)) {
-      return isValidHeader(in);
-    }
-  }
-
-  public static boolean isValidHeader(FSDataInputStream in) throws IOException 
{
-    in.seek(0);
-    byte[] magic = new byte[MAGIC.length];
-    in.readFully(magic);
-    if (!Arrays.equals(MAGIC, magic)) {
-      return false;
-    }
-    int majorVersion = in.readByte();
-    in.readByte(); // minorVersion, for now we don't use it
-    // Basic version check for now. We assume semver conventions where only 
higher major
-    // versions may be incompatible.
-    if (majorVersion > VERSION_MAJOR) {
-      return false;
-    }
-    return true;
-  }
-
   @Override
   public String toString() {
     return "LogFileHeader [majorVersion=" + majorVersion + ", minorVersion=" + 
minorVersion + "]";
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReader.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReader.java
index 4e4bd4b67b..8b24f6c0c1 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReader.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReader.java
@@ -134,7 +134,8 @@ public class LogFileReader implements LogFile.Reader {
       throw e;
     } finally {
       closed = true;
-      LOG.debug("Closed LogFileReader for path {}", context.getFilePath());
+      LOG.debug("Closed LogFileReader for path {}",
+        context != null ? context.getFilePath() : "null");
     }
   }
 
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReaderContext.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReaderContext.java
index 5573df7599..8fbab2e8da 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReaderContext.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReaderContext.java
@@ -35,12 +35,17 @@ public class LogFileReaderContext {
   /** Default for skipping corrupt blocks */
   public static final boolean DEFAULT_LOGFILE_SKIP_CORRUPT_BLOCKS = true;
 
+  public static final String LOGFILE_VALIDATE_TRAILER =
+    "phoenix.replication.logfile.validate.trailer";
+  public static final boolean DEFAULT_LOGFILE_VALIDATE_TRAILER = true;
+
   private final Configuration conf;
   private FileSystem fs;
   private Path path;
   private LogFileCodec codec;
   private long fileSize = -1;
   private boolean isSkipCorruptBlocks;
+  private boolean isValidateTrailer;
   private long blocksRead;
   private long recordsRead;
   private long corruptBlocksSkipped;
@@ -49,6 +54,8 @@ public class LogFileReaderContext {
     this.conf = conf;
     this.isSkipCorruptBlocks =
       conf.getBoolean(LOGFILE_SKIP_CORRUPT_BLOCKS, 
DEFAULT_LOGFILE_SKIP_CORRUPT_BLOCKS);
+    this.isValidateTrailer =
+      conf.getBoolean(LOGFILE_VALIDATE_TRAILER, 
DEFAULT_LOGFILE_VALIDATE_TRAILER);
     // Note: When we have multiple codec types, instantiate the appropriate 
type based on
     // configuration;
     this.codec = new LogFileCodec();
@@ -145,12 +152,21 @@ public class LogFileReaderContext {
     return this;
   }
 
+  public boolean isValidateTrailer() {
+    return isValidateTrailer;
+  }
+
+  public LogFileReaderContext setValidateTrailer(boolean validateTrailer) {
+    this.isValidateTrailer = validateTrailer;
+    return this;
+  }
+
   @Override
   public String toString() {
     return "LogFileReaderContext [filePath=" + path + ", fileSize=" + fileSize
-      + ", isSkipCorruptBlocks=" + isSkipCorruptBlocks + ", codec=" + codec + 
", blocksRead="
-      + blocksRead + ", recordsRead=" + recordsRead + ", corruptBlocksSkipped="
-      + corruptBlocksSkipped + "]";
+      + ", isSkipCorruptBlocks=" + isSkipCorruptBlocks + ", 
isValidateTrailer=" + isValidateTrailer
+      + ", codec=" + codec + ", blocksRead=" + blocksRead + ", recordsRead=" + 
recordsRead
+      + ", corruptBlocksSkipped=" + corruptBlocksSkipped + "]";
   }
 
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileTrailer.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileTrailer.java
index 84433e0309..2f77d498d8 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileTrailer.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileTrailer.java
@@ -19,11 +19,9 @@ package org.apache.phoenix.replication.log;
 
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.EOFException;
 import java.io.IOException;
 import java.util.Arrays;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.util.Bytes;
 
 public class LogFileTrailer implements LogFile.Trailer {
@@ -112,23 +110,31 @@ public class LogFileTrailer implements LogFile.Trailer {
   }
 
   public void readFixedFields(DataInput in) throws IOException {
-    this.recordCount = in.readLong();
-    this.blockCount = in.readLong();
-    this.blocksStartOffset = in.readLong();
-    this.trailerStartOffset = in.readLong();
-    this.majorVersion = in.readByte();
-    this.minorVersion = in.readByte();
+    try {
+      this.recordCount = in.readLong();
+      this.blockCount = in.readLong();
+      this.blocksStartOffset = in.readLong();
+      this.trailerStartOffset = in.readLong();
+      this.majorVersion = in.readByte();
+      this.minorVersion = in.readByte();
+    } catch (EOFException e) {
+      throw (IOException) new InvalidLogTrailerException("Short fixed 
fields").initCause(e);
+    }
     // Basic version check for now. We assume semver conventions where only 
higher major
     // versions may be incompatible.
     if (majorVersion > LogFileHeader.VERSION_MAJOR) {
-      throw new IOException("Unsupported LogFile version. Got major=" + 
majorVersion + " minor="
-        + minorVersion + ", expected major=" + LogFileHeader.VERSION_MAJOR + " 
minor="
+      throw new InvalidLogTrailerException("Unsupported version. Got major=" + 
majorVersion
+        + " minor=" + minorVersion + ", expected major=" + 
LogFileHeader.VERSION_MAJOR + " minor="
         + LogFileHeader.VERSION_MINOR);
     }
     byte[] magic = new byte[LogFileHeader.MAGIC.length];
-    in.readFully(magic);
+    try {
+      in.readFully(magic);
+    } catch (EOFException e) {
+      throw (IOException) new InvalidLogTrailerException("Short 
magic").initCause(e);
+    }
     if (!Arrays.equals(LogFileHeader.MAGIC, magic)) {
-      throw new IOException("Invalid LogFile magic. Got " + 
Bytes.toStringBinary(magic)
+      throw new InvalidLogTrailerException("Bad magic. Got " + 
Bytes.toStringBinary(magic)
         + ", expected " + Bytes.toStringBinary(LogFileHeader.MAGIC));
     }
   }
@@ -171,33 +177,6 @@ public class LogFileTrailer implements LogFile.Trailer {
       + FIXED_TRAILER_SIZE;
   }
 
-  public static boolean isValidTrailer(final FileSystem fs, final Path path) 
throws IOException {
-    try (FSDataInputStream in = fs.open(path)) {
-      return isValidTrailer(in, fs.getFileStatus(path).getLen());
-    }
-  }
-
-  public static boolean isValidTrailer(FSDataInputStream in, long length) 
throws IOException {
-    long offset = length - VERSION_AND_MAGIC_SIZE;
-    if (offset < 0) {
-      return false;
-    }
-    in.seek(offset);
-    byte[] magic = new byte[LogFileHeader.MAGIC.length];
-    in.readFully(magic);
-    if (!Arrays.equals(LogFileHeader.MAGIC, magic)) {
-      return false;
-    }
-    int majorVersion = in.readByte();
-    in.readByte(); // minorVersion, for now we don't use it
-    // Basic version check for now. We assume semver conventions where only 
higher major
-    // versions may be incompatible.
-    if (majorVersion > LogFileHeader.VERSION_MAJOR) {
-      return false;
-    }
-    return true;
-  }
-
   @Override
   public String toString() {
     return "LogFileTrailer [majorVersion=" + majorVersion + ", minorVersion=" 
+ minorVersion
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
index ca3c4cfe97..062bef68bd 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
@@ -553,8 +553,11 @@ public class ReplicationLogProcessorTestIT extends 
ParallelStatsDisabledIT {
     writer.append(tableNameString, 1, put);
     writer.sync();
 
+    // For processing of an unclosed file to work, we need to disable trailer 
validation
+    Configuration testConf = new Configuration(conf);
+    testConf.setBoolean(LogFileReaderContext.LOGFILE_VALIDATE_TRAILER, false);
     ReplicationLogProcessor spyProcessor =
-      Mockito.spy(new ReplicationLogProcessor(conf, testHAGroupName));
+      Mockito.spy(new ReplicationLogProcessor(testConf, testHAGroupName));
 
     // Create argument captor to capture the actual parameters passed to 
processReplicationLogBatch
     ArgumentCaptor<Map<TableName, List<Mutation>>> mapCaptor = 
ArgumentCaptor.forClass(Map.class);
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java
index 885f1e35a8..0d473cfd65 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java
@@ -294,6 +294,9 @@ public class LogFileFormatTest {
     byte[] data = writerBaos.toByteArray();
     byte[] truncatedData = Arrays.copyOf(data, (int) truncationPoint);
 
+    // We truncated the final block, so the trailer is gone too.
+    readerContext.setValidateTrailer(false);
+
     initLogFileReader(truncatedData);
 
     List<LogFile.Record> decoded = new ArrayList<>();
@@ -339,6 +342,7 @@ public class LogFileFormatTest {
     LogFileTestUtil.SeekableByteArrayInputStream input =
       new LogFileTestUtil.SeekableByteArrayInputStream(truncatedData);
     readerContext.setFileSize(truncatedData.length);
+    readerContext.setValidateTrailer(false);
     // This init should log a warning but succeed
     reader.init(readerContext, input);
 
@@ -373,6 +377,7 @@ public class LogFileFormatTest {
     LogFileTestUtil.SeekableByteArrayInputStream input =
       new LogFileTestUtil.SeekableByteArrayInputStream(truncatedData);
     readerContext.setFileSize(truncatedData.length);
+    readerContext.setValidateTrailer(false);
     // Init should log a warning but succeed by ignoring the trailer
     reader.init(readerContext, input);
 
@@ -401,6 +406,88 @@ public class LogFileFormatTest {
     assertEquals("Records read count mismatch", totalRecords, 
readerContext.getRecordsRead());
   }
 
+  @Test
+  public void testFailIfMissingHeader() throws IOException {
+    // Zero length file
+    byte[] data = new byte[0];
+    LogFileTestUtil.SeekableByteArrayInputStream input =
+      new LogFileTestUtil.SeekableByteArrayInputStream(data);
+    readerContext.setFileSize(data.length);
+    readerContext.setValidateTrailer(false);
+    try {
+      reader.init(readerContext, input);
+      fail("Expected InvalidLogHeaderException for zero length file");
+    } catch (InvalidLogHeaderException e) {
+      assertTrue("Exception message should contain 'Short magic'",
+        e.getMessage().contains("Short magic"));
+    }
+  }
+
+  @Test
+  public void testFailIfInvalidHeader() throws IOException {
+    initLogFileWriter();
+    writer.close(); // Writes valid trailer
+    byte[] data = writerBaos.toByteArray();
+    LogFileTestUtil.SeekableByteArrayInputStream input =
+      new LogFileTestUtil.SeekableByteArrayInputStream(data);
+    readerContext.setFileSize(data.length);
+    readerContext.setValidateTrailer(true);
+    data[0] = (byte) 'X'; // Corrupt the first magic byte
+    try {
+      reader.init(readerContext, input);
+      fail("Expected InvalidLogHeaderException for file with corrupted header 
magic");
+    } catch (InvalidLogHeaderException e) {
+      assertTrue("Exception message should contain 'Bad magic'",
+        e.getMessage().contains("Bad magic"));
+    }
+  }
+
+  @Test
+  public void testFailIfMissingTrailer() throws IOException {
+    initLogFileWriter();
+    writeBlock(writer, "B1", 0, 5);
+    // Don't close the writer, simulate missing trailer
+    byte[] data = writerBaos.toByteArray();
+    // Re-initialize reader with truncated data and trailer validation enabled
+    LogFileTestUtil.SeekableByteArrayInputStream input =
+      new LogFileTestUtil.SeekableByteArrayInputStream(data);
+    readerContext.setFileSize(data.length);
+    // Enable trailer validation
+    readerContext.setValidateTrailer(true);
+    try {
+      reader.init(readerContext, input);
+      fail("Expected InvalidLogTrailerException when trailer is missing");
+    } catch (InvalidLogTrailerException e) {
+      assertTrue("Exception message should contain 'Unsupported version'",
+        e.getMessage().contains("Unsupported version"));
+    }
+  }
+
+  @Test
+  public void testFailIfInvalidTrailer() throws IOException {
+    initLogFileWriter();
+    writeBlock(writer, "B1", 0, 5);
+    writer.close(); // Writes valid trailer
+    byte[] data = writerBaos.toByteArray();
+    // Corrupt the trailer by changing the magic bytes
+    int trailerStartOffset = data.length - LogFileTrailer.FIXED_TRAILER_SIZE;
+    int magicOffset =
+      trailerStartOffset + LogFileTrailer.FIXED_TRAILER_SIZE - 
LogFileHeader.MAGIC.length;
+    data[magicOffset] = (byte) 'X'; // Corrupt the first magic byte
+    // Re-initialize reader with corrupted trailer and trailer validation 
enabled
+    LogFileTestUtil.SeekableByteArrayInputStream input =
+      new LogFileTestUtil.SeekableByteArrayInputStream(data);
+    readerContext.setFileSize(data.length);
+    readerContext.setValidateTrailer(true);
+    try {
+      reader.init(readerContext, input);
+      fail("Expected InvalidLogTrailerException when trailer magic is 
corrupt");
+    } catch (InvalidLogTrailerException e) {
+      assertTrue("Exception message should contain 'Bad magic'",
+        e.getMessage().contains("Bad magic"));
+    }
+  }
+
   @Test
   public void testLogFileCorruptionFirstBlockChecksum() throws IOException {
     initLogFileWriter();
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterTest.java
index 36dde2fc83..95c6bb5133 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterTest.java
@@ -137,6 +137,16 @@ public class LogFileWriterTest {
     reader.close();
   }
 
+  @Test
+  public void testHeaderWrittenImmediately() throws IOException {
+    // This should write header immediately
+    initLogFileWriter();
+    // Verify file exists and has content (header should be written)
+    assertTrue("File should exist after init", localFs.exists(filePath));
+    assertEquals("File should have header written", LogFileHeader.HEADERSIZE, 
writer.getLength());
+    writer.close();
+  }
+
   private void initLogFileReader() throws IOException {
     readerContext = new 
LogFileReaderContext(conf).setFileSystem(localFs).setFilePath(filePath);
     reader.init(readerContext);

Reply via email to