This is an automated email from the ASF dual-hosted git repository. apurtell pushed a commit to branch PHOENIX-7562-feature in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature by this push: new 5f1c8433ab PHOENIX-7669 Provide more control over log file validation (#2231) 5f1c8433ab is described below commit 5f1c8433abb4d3df8fcf763b8bfd2c0ae5f4649f Author: Andrew Purtell <apurt...@apache.org> AuthorDate: Mon Jul 21 09:42:21 2025 -0700 PHOENIX-7669 Provide more control over log file validation (#2231) --- .../replication/log/InvalidLogHeaderException.java | 31 ++++++++ .../log/InvalidLogTrailerException.java | 30 ++++++++ .../apache/phoenix/replication/log/LogFile.java | 25 +++++-- .../replication/log/LogFileFormatReader.java | 19 +++-- .../replication/log/LogFileFormatWriter.java | 30 ++------ .../phoenix/replication/log/LogFileHeader.java | 51 ++++--------- .../phoenix/replication/log/LogFileReader.java | 3 +- .../replication/log/LogFileReaderContext.java | 21 +++++- .../phoenix/replication/log/LogFileTrailer.java | 57 +++++--------- .../phoenix/replication/log/LogFileFormatTest.java | 87 ++++++++++++++++++++++ .../phoenix/replication/log/LogFileWriterTest.java | 10 +++ .../reader/ReplicationLogProcessorTest.java | 5 +- 12 files changed, 250 insertions(+), 119 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogHeaderException.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogHeaderException.java new file mode 100644 index 0000000000..1b2284732d --- /dev/null +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogHeaderException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.log; + +import java.io.IOException; + +/** Exception thrown when a log file has an invalid header. */ +public class InvalidLogHeaderException extends IOException { + private static final long serialVersionUID = 1L; + + public InvalidLogHeaderException(String message) { + super(message); + } + +} + diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogTrailerException.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogTrailerException.java new file mode 100644 index 0000000000..61cfbd3d38 --- /dev/null +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogTrailerException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.log; + +import java.io.IOException; + +/** Exception thrown when a log file has an invalid trailer. */ +public class InvalidLogTrailerException extends IOException { + private static final long serialVersionUID = 1L; + + public InvalidLogTrailerException(String message) { + super(message); + } + +} diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java index 419313bd7e..f84d316234 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java @@ -23,6 +23,7 @@ import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -417,21 +418,31 @@ public interface LogFile { /** * Utility for determining if a file is a valid replication log file. + * @param conf The Configuration * @param fs The FileSystem * @param path Path to the potential replication log file + * @param validateTrailer Whether to validate the trailer * @return true if the file is a valid replication log file, false otherwise * @throws IOException if an I/O problem was encountered */ - static boolean isValidLogFile(final FileSystem fs, final Path path) throws IOException { + static boolean isValidLogFile(final Configuration conf, final FileSystem fs, final Path path, + final boolean validateTrailer) throws IOException { long length = fs.getFileStatus(path).getLen(); try (FSDataInputStream in = fs.open(path)) { - if (LogFileTrailer.isValidTrailer(in, length)) { - return true; - } else { - // Not a valid trailer, do we need to do something (set a flag)? - // Fall back to checking the header. - return LogFileHeader.isValidHeader(in); + // Check if the file is too short to be a valid log file. + if (length < LogFileHeader.HEADERSIZE) { + return false; } + try (LogFileFormatReader reader = new LogFileFormatReader()) { + LogFileReaderContext context = new LogFileReaderContext(conf) + .setFilePath(path) + .setFileSize(length) + .setValidateTrailer(validateTrailer); + reader.init(context, (SeekableDataInput) in); + } catch (InvalidLogHeaderException | InvalidLogTrailerException e) { + return false; + } + return true; } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatReader.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatReader.java index 194abe7f87..9d0f17a2f2 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatReader.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatReader.java @@ -48,7 +48,6 @@ public class LogFileFormatReader implements Closeable { private ByteBuffer currentBlockBuffer; private long currentBlockDataBytes; private long currentBlockConsumedBytes; - private boolean trailerValidated; private CRC64 crc = new CRC64(); public LogFileFormatReader() { @@ -62,12 +61,14 @@ public class LogFileFormatReader implements Closeable { this.currentBlockConsumedBytes = 0; try { readAndValidateTrailer(); - trailerValidated = true; } catch (IOException e) { + // If we are validating the trailer, we cannot proceed without it. + if (context.isValidateTrailer()) { + throw e; + } // Log warning, trailer might be missing or corrupt, proceed without it - LOG.warn("Failed to read or validate Log trailer for path: " - + (context != null ? context.getFilePath() : "unknown") - + ". Proceeding without trailer.", e); + LOG.warn("Failed to validate Log trailer for " + context.getFilePath() + + ", proceeding", e); trailer = null; // Ensure trailer is null if reading/validation failed } this.decoder = null; @@ -78,8 +79,7 @@ public class LogFileFormatReader implements Closeable { private void readAndValidateTrailer() throws IOException { if (context.getFileSize() < LogFileTrailer.FIXED_TRAILER_SIZE) { - throw new IOException("File size " + context.getFileSize() - + " is smaller than the fixed trailer size " + LogFileTrailer.FIXED_TRAILER_SIZE); + throw new InvalidLogTrailerException("Short file"); } LogFileTrailer ourTrailer = new LogFileTrailer(); // Fixed trailer fields will be LogTrailer.FIXED_TRAILER_SIZE bytes back from end of file. @@ -343,7 +343,7 @@ public class LogFileFormatReader implements Closeable { // Validates read counts against trailer counts if trailer was successfully read private void validateReadCounts() { - if (!trailerValidated || trailer == null) { + if (trailer == null) { return; } if (trailer.getBlockCount() != context.getBlocksRead()) { @@ -374,8 +374,7 @@ public class LogFileFormatReader implements Closeable { + input + ", header=" + header + ", trailer=" + trailer + ", currentPosition=" + currentPosition + ", currentBlockBuffer=" + currentBlockBuffer + ", currentBlockUncompressedSize=" + currentBlockDataBytes - + ", currentBlockConsumedBytes=" + currentBlockConsumedBytes - + ", trailerValidated=" + trailerValidated + "]"; + + ", currentBlockConsumedBytes=" + currentBlockConsumedBytes + "]"; } LogFile.Header getHeader() { diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java index 7e7f3c1931..d2778478ea 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java @@ -41,8 +41,6 @@ public class LogFileFormatWriter implements Closeable { private SyncableDataOutput output; private ByteArrayOutputStream currentBlockBytes; private DataOutputStream blockDataStream; - private boolean headerWritten = false; - private boolean trailerWritten = false; private long recordCount = 0; private long blockCount = 0; private long blocksStartOffset = -1; @@ -60,15 +58,14 @@ public class LogFileFormatWriter implements Closeable { this.currentBlockBytes = new ByteArrayOutputStream(); this.blockDataStream = new DataOutputStream(currentBlockBytes); this.encoder = context.getCodec().getEncoder(blockDataStream); + // Write header immediately when file is created + writeFileHeader(); } private void writeFileHeader() throws IOException { - if (!headerWritten) { - LogFileHeader header = new LogFileHeader(); - header.write(output); - blocksStartOffset = output.getPos(); // First block starts after header - headerWritten = true; - } + LogFileHeader header = new LogFileHeader(); + header.write(output); + blocksStartOffset = output.getPos(); // First block starts after header } public long getBlocksStartOffset() { @@ -76,13 +73,6 @@ public class LogFileFormatWriter implements Closeable { } public void append(LogFile.Record record) throws IOException { - if (!headerWritten) { - // Lazily write file header - writeFileHeader(); - } - if (trailerWritten) { - throw new IOException("Cannot append record after trailer has been written"); - } if (blockDataStream == null) { startBlock(); // Start the block if needed } @@ -188,15 +178,10 @@ public class LogFileFormatWriter implements Closeable { @Override public void close() throws IOException { - // We use the fact we have already written the trailer as the boolean "closed" condition. - if (trailerWritten) { + if (output == null) { return; } try { - // We might be closing an empty file, handle this case correctly. - if (!headerWritten) { - writeFileHeader(); - } // Close any outstanding block. closeBlock(); // After we write the trailer we consider the file closed. @@ -209,6 +194,7 @@ public class LogFileFormatWriter implements Closeable { } catch (IOException e) { LOG.error("Exception while closing LogFormatWriter", e); } + output = null; } } } @@ -220,7 +206,6 @@ public class LogFileFormatWriter implements Closeable { .setBlocksStartOffset(blocksStartOffset) .setTrailerStartOffset(output.getPos()); trailer.write(output); - trailerWritten = true; try { output.sync(); } catch (IOException e) { @@ -233,7 +218,6 @@ public class LogFileFormatWriter implements Closeable { public String toString() { return "LogFileFormatWriter [writerContext=" + context + ", currentBlockUncompressedBytes=" + currentBlockBytes - + ", headerWritten=" + headerWritten + ", trailerWritten=" + trailerWritten + ", recordCount=" + recordCount + ", blockCount=" + blockCount + ", blocksStartOffset=" + blocksStartOffset + "]"; } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileHeader.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileHeader.java index bfc07b76d9..4fc85edabf 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileHeader.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileHeader.java @@ -19,12 +19,10 @@ package org.apache.phoenix.replication.log; import java.io.DataInput; import java.io.DataOutput; +import java.io.EOFException; import java.io.IOException; import java.util.Arrays; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.util.Bytes; public class LogFileHeader implements LogFile.Header { @@ -36,7 +34,7 @@ public class LogFileHeader implements LogFile.Header { /** Current minor version of the replication log format */ static final int VERSION_MINOR = 0; - static final int HEADERSIZE = MAGIC.length + 3 * Bytes.SIZEOF_BYTE; + static final int HEADERSIZE = MAGIC.length + 2 * Bytes.SIZEOF_BYTE; private int majorVersion = VERSION_MAJOR; private int minorVersion = VERSION_MINOR; @@ -70,17 +68,25 @@ public class LogFileHeader implements LogFile.Header { @Override public void readFields(DataInput in) throws IOException { byte[] magic = new byte[MAGIC.length]; - in.readFully(magic); + try { + in.readFully(magic); + } catch (EOFException e) { + throw (IOException) new InvalidLogHeaderException("Short magic").initCause(e); + } if (!Arrays.equals(MAGIC, magic)) { - throw new IOException("Invalid LogFile magic. Got " + Bytes.toStringBinary(magic) + throw new InvalidLogHeaderException("Bad magic. Got " + Bytes.toStringBinary(magic) + ", expected " + Bytes.toStringBinary(MAGIC)); } - majorVersion = in.readByte(); - minorVersion = in.readByte(); + try { + majorVersion = in.readByte(); + minorVersion = in.readByte(); + } catch (EOFException e) { + throw (IOException) new InvalidLogHeaderException("Short version").initCause(e); + } // Basic version check for now. We assume semver conventions where only higher major // versions may be incompatible. if (majorVersion > VERSION_MAJOR) { - throw new IOException("Unsupported LogFile version. Got major=" + majorVersion + throw new InvalidLogHeaderException("Unsupported version. Got major=" + majorVersion + " minor=" + minorVersion + ", expected major=" + VERSION_MAJOR + " minor=" + VERSION_MINOR); } @@ -98,33 +104,6 @@ public class LogFileHeader implements LogFile.Header { return HEADERSIZE; } - public static boolean isValidHeader(final FileSystem fs, final Path path) - throws IOException { - if (fs.getFileStatus(path).getLen() < HEADERSIZE) { - return false; - } - try (FSDataInputStream in = fs.open(path)) { - return isValidHeader(in); - } - } - - public static boolean isValidHeader(FSDataInputStream in) throws IOException { - in.seek(0); - byte[] magic = new byte[MAGIC.length]; - in.readFully(magic); - if (!Arrays.equals(MAGIC, magic)) { - return false; - } - int majorVersion = in.readByte(); - in.readByte(); // minorVersion, for now we don't use it - // Basic version check for now. We assume semver conventions where only higher major - // versions may be incompatible. - if (majorVersion > VERSION_MAJOR) { - return false; - } - return true; - } - @Override public String toString() { return "LogFileHeader [majorVersion=" + majorVersion + ", minorVersion=" + minorVersion diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReader.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReader.java index 6b3fa028b2..1961922b7a 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReader.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReader.java @@ -137,7 +137,8 @@ public class LogFileReader implements LogFile.Reader { throw e; } finally { closed = true; - LOG.debug("Closed LogFileReader for path {}", context.getFilePath()); + LOG.debug("Closed LogFileReader for path {}", context != null ? context.getFilePath() + : "null"); } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReaderContext.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReaderContext.java index 52fbdb0cbb..a2c6de3488 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReaderContext.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReaderContext.java @@ -36,12 +36,17 @@ public class LogFileReaderContext { /** Default for skipping corrupt blocks */ public static final boolean DEFAULT_LOGFILE_SKIP_CORRUPT_BLOCKS = true; + public static final String LOGFILE_VALIDATE_TRAILER = + "phoenix.replication.logfile.validate.trailer"; + public static final boolean DEFAULT_LOGFILE_VALIDATE_TRAILER = true; + private final Configuration conf; private FileSystem fs; private Path path; private LogFileCodec codec; private long fileSize = -1; private boolean isSkipCorruptBlocks; + private boolean isValidateTrailer; private long blocksRead; private long recordsRead; private long corruptBlocksSkipped; @@ -50,6 +55,8 @@ public class LogFileReaderContext { this.conf = conf; this.isSkipCorruptBlocks = conf.getBoolean(LOGFILE_SKIP_CORRUPT_BLOCKS, DEFAULT_LOGFILE_SKIP_CORRUPT_BLOCKS); + this.isValidateTrailer = conf.getBoolean(LOGFILE_VALIDATE_TRAILER, + DEFAULT_LOGFILE_VALIDATE_TRAILER); // Note: When we have multiple codec types, instantiate the appropriate type based on // configuration; this.codec = new LogFileCodec(); @@ -146,11 +153,21 @@ public class LogFileReaderContext { return this; } + public boolean isValidateTrailer() { + return isValidateTrailer; + } + + public LogFileReaderContext setValidateTrailer(boolean validateTrailer) { + this.isValidateTrailer = validateTrailer; + return this; + } + @Override public String toString() { return "LogFileReaderContext [filePath=" + path + ", fileSize=" + fileSize - + ", isSkipCorruptBlocks=" + isSkipCorruptBlocks + ", codec=" + codec + ", blocksRead=" - + blocksRead + ", recordsRead=" + recordsRead + ", corruptBlocksSkipped=" + + ", isSkipCorruptBlocks=" + isSkipCorruptBlocks + ", isValidateTrailer=" + + isValidateTrailer + ", codec=" + codec + ", blocksRead=" + blocksRead + + ", recordsRead=" + recordsRead + ", corruptBlocksSkipped=" + corruptBlocksSkipped + "]"; } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileTrailer.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileTrailer.java index b05da86aae..8e29bf288a 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileTrailer.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileTrailer.java @@ -19,12 +19,10 @@ package org.apache.phoenix.replication.log; import java.io.DataInput; import java.io.DataOutput; +import java.io.EOFException; import java.io.IOException; import java.util.Arrays; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.util.Bytes; public class LogFileTrailer implements LogFile.Trailer { @@ -113,23 +111,31 @@ public class LogFileTrailer implements LogFile.Trailer { } public void readFixedFields(DataInput in) throws IOException { - this.recordCount = in.readLong(); - this.blockCount = in.readLong(); - this.blocksStartOffset = in.readLong(); - this.trailerStartOffset = in.readLong(); - this.majorVersion = in.readByte(); - this.minorVersion = in.readByte(); + try { + this.recordCount = in.readLong(); + this.blockCount = in.readLong(); + this.blocksStartOffset = in.readLong(); + this.trailerStartOffset = in.readLong(); + this.majorVersion = in.readByte(); + this.minorVersion = in.readByte(); + } catch (EOFException e) { + throw (IOException) new InvalidLogTrailerException("Short fixed fields").initCause(e); + } // Basic version check for now. We assume semver conventions where only higher major // versions may be incompatible. if (majorVersion > LogFileHeader.VERSION_MAJOR) { - throw new IOException("Unsupported LogFile version. Got major=" + majorVersion + throw new InvalidLogTrailerException("Unsupported version. Got major=" + majorVersion + " minor=" + minorVersion + ", expected major=" + LogFileHeader.VERSION_MAJOR + " minor=" + LogFileHeader.VERSION_MINOR); } byte[] magic = new byte[LogFileHeader.MAGIC.length]; - in.readFully(magic); + try { + in.readFully(magic); + } catch (EOFException e) { + throw (IOException) new InvalidLogTrailerException("Short magic").initCause(e); + } if (!Arrays.equals(LogFileHeader.MAGIC, magic)) { - throw new IOException("Invalid LogFile magic. Got " + Bytes.toStringBinary(magic) + throw new InvalidLogTrailerException("Bad magic. Got " + Bytes.toStringBinary(magic) + ", expected " + Bytes.toStringBinary(LogFileHeader.MAGIC)); } } @@ -172,33 +178,6 @@ public class LogFileTrailer implements LogFile.Trailer { + FIXED_TRAILER_SIZE; } - public static boolean isValidTrailer(final FileSystem fs, final Path path) throws IOException { - try (FSDataInputStream in = fs.open(path)) { - return isValidTrailer(in, fs.getFileStatus(path).getLen()); - } - } - - public static boolean isValidTrailer(FSDataInputStream in, long length) throws IOException { - long offset = length - VERSION_AND_MAGIC_SIZE; - if (offset < 0) { - return false; - } - in.seek(offset); - byte[] magic = new byte[LogFileHeader.MAGIC.length]; - in.readFully(magic); - if (!Arrays.equals(LogFileHeader.MAGIC, magic)) { - return false; - } - int majorVersion = in.readByte(); - in.readByte(); // minorVersion, for now we don't use it - // Basic version check for now. We assume semver conventions where only higher major - // versions may be incompatible. - if (majorVersion > LogFileHeader.VERSION_MAJOR) { - return false; - } - return true; - } - @Override public String toString() { return "LogFileTrailer [majorVersion=" + majorVersion + ", minorVersion=" + minorVersion diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java index 875cb6ea1b..f623fb9f0a 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java @@ -296,6 +296,9 @@ public class LogFileFormatTest { byte[] data = writerBaos.toByteArray(); byte[] truncatedData = Arrays.copyOf(data, (int) truncationPoint); + // We truncated the final block, so the trailer is gone too. + readerContext.setValidateTrailer(false); + initLogFileReader(truncatedData); List<LogFile.Record> decoded = new ArrayList<>(); @@ -342,6 +345,7 @@ public class LogFileFormatTest { LogFileTestUtil.SeekableByteArrayInputStream input = new LogFileTestUtil.SeekableByteArrayInputStream(truncatedData); readerContext.setFileSize(truncatedData.length); + readerContext.setValidateTrailer(false); // This init should log a warning but succeed reader.init(readerContext, input); @@ -377,6 +381,7 @@ public class LogFileFormatTest { LogFileTestUtil.SeekableByteArrayInputStream input = new LogFileTestUtil.SeekableByteArrayInputStream(truncatedData); readerContext.setFileSize(truncatedData.length); + readerContext.setValidateTrailer(false); // Init should log a warning but succeed by ignoring the trailer reader.init(readerContext, input); @@ -406,6 +411,88 @@ public class LogFileFormatTest { assertEquals("Records read count mismatch", totalRecords, readerContext.getRecordsRead()); } + @Test + public void testFailIfMissingHeader() throws IOException { + // Zero length file + byte[] data = new byte[0]; + LogFileTestUtil.SeekableByteArrayInputStream input = + new LogFileTestUtil.SeekableByteArrayInputStream(data); + readerContext.setFileSize(data.length); + readerContext.setValidateTrailer(false); + try { + reader.init(readerContext, input); + fail("Expected InvalidLogHeaderException for zero length file"); + } catch (InvalidLogHeaderException e) { + assertTrue("Exception message should contain 'Short magic'", + e.getMessage().contains("Short magic")); + } + } + + @Test + public void testFailIfInvalidHeader() throws IOException { + initLogFileWriter(); + writer.close(); // Writes valid trailer + byte[] data = writerBaos.toByteArray(); + LogFileTestUtil.SeekableByteArrayInputStream input = + new LogFileTestUtil.SeekableByteArrayInputStream(data); + readerContext.setFileSize(data.length); + readerContext.setValidateTrailer(true); + data[0] = (byte) 'X'; // Corrupt the first magic byte + try { + reader.init(readerContext, input); + fail("Expected InvalidLogHeaderException for file with corrupted header magic"); + } catch (InvalidLogHeaderException e) { + assertTrue("Exception message should contain 'Bad magic'", + e.getMessage().contains("Bad magic")); + } + } + + @Test + public void testFailIfMissingTrailer() throws IOException { + initLogFileWriter(); + writeBlock(writer, "B1", 0, 5); + // Don't close the writer, simulate missing trailer + byte[] data = writerBaos.toByteArray(); + // Re-initialize reader with truncated data and trailer validation enabled + LogFileTestUtil.SeekableByteArrayInputStream input = + new LogFileTestUtil.SeekableByteArrayInputStream(data); + readerContext.setFileSize(data.length); + // Enable trailer validation + readerContext.setValidateTrailer(true); + try { + reader.init(readerContext, input); + fail("Expected InvalidLogTrailerException when trailer is missing"); + } catch (InvalidLogTrailerException e) { + assertTrue("Exception message should contain 'Unsupported version'", + e.getMessage().contains("Unsupported version")); + } + } + + @Test + public void testFailIfInvalidTrailer() throws IOException { + initLogFileWriter(); + writeBlock(writer, "B1", 0, 5); + writer.close(); // Writes valid trailer + byte[] data = writerBaos.toByteArray(); + // Corrupt the trailer by changing the magic bytes + int trailerStartOffset = data.length - LogFileTrailer.FIXED_TRAILER_SIZE; + int magicOffset = trailerStartOffset + LogFileTrailer.FIXED_TRAILER_SIZE + - LogFileHeader.MAGIC.length; + data[magicOffset] = (byte) 'X'; // Corrupt the first magic byte + // Re-initialize reader with corrupted trailer and trailer validation enabled + LogFileTestUtil.SeekableByteArrayInputStream input = + new LogFileTestUtil.SeekableByteArrayInputStream(data); + readerContext.setFileSize(data.length); + readerContext.setValidateTrailer(true); + try { + reader.init(readerContext, input); + fail("Expected InvalidLogTrailerException when trailer magic is corrupt"); + } catch (InvalidLogTrailerException e) { + assertTrue("Exception message should contain 'Bad magic'", + e.getMessage().contains("Bad magic")); + } + } + @Test public void testLogFileCorruptionFirstBlockChecksum() throws IOException { initLogFileWriter(); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterTest.java index 55c4737223..b4e828a3b6 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterTest.java @@ -137,6 +137,16 @@ public class LogFileWriterTest { reader.close(); } + @Test + public void testHeaderWrittenImmediately() throws IOException { + // This should write header immediately + initLogFileWriter(); + // Verify file exists and has content (header should be written) + assertTrue("File should exist after init", localFs.exists(filePath)); + assertEquals("File should have header written", LogFileHeader.HEADERSIZE, writer.getLength()); + writer.close(); + } + private void initLogFileReader() throws IOException { readerContext = new LogFileReaderContext(conf).setFileSystem(localFs) .setFilePath(filePath); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java index e97cd027e9..2bbc18a35d 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java @@ -500,7 +500,10 @@ public class ReplicationLogProcessorTest extends ParallelStatsDisabledIT { writer.append(tableNameString, 1, put); writer.sync(); - ReplicationLogProcessor spyProcessor = Mockito.spy(new ReplicationLogProcessor(conf, testHAGroupName)); + // For processing of an unclosed file to work, we need to disable trailer validation + Configuration testConf = new Configuration(conf); + testConf.setBoolean(LogFileReaderContext.LOGFILE_VALIDATE_TRAILER, false); + ReplicationLogProcessor spyProcessor = Mockito.spy(new ReplicationLogProcessor(testConf, testHAGroupName)); // Create argument captor to capture the actual parameters passed to processReplicationLogBatch ArgumentCaptor<Map<TableName, List<Mutation>>> mapCaptor =