This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new 755a47a7cf PHOENIX-7669 Enhance Header and Trailer validations to
gracefully handle unclosed files (#2341)
755a47a7cf is described below
commit 755a47a7cf4d1f0cf68029be54979d36a36835e5
Author: Himanshu Gwalani <[email protected]>
AuthorDate: Sat Jan 10 05:58:12 2026 +0530
PHOENIX-7669 Enhance Header and Trailer validations to gracefully handle
unclosed files (#2341)
Co-authored-by: Himanshu Gwalani
<[email protected]>
---
.../replication/log/InvalidLogHeaderException.java | 30 ++++++++
.../log/InvalidLogTrailerException.java | 30 ++++++++
.../apache/phoenix/replication/log/LogFile.java | 27 ++++---
.../replication/log/LogFileFormatReader.java | 19 ++---
.../replication/log/LogFileFormatWriter.java | 32 ++------
.../phoenix/replication/log/LogFileHeader.java | 53 +++++--------
.../phoenix/replication/log/LogFileReader.java | 3 +-
.../replication/log/LogFileReaderContext.java | 22 +++++-
.../phoenix/replication/log/LogFileTrailer.java | 59 +++++----------
.../reader/ReplicationLogProcessorTestIT.java | 5 +-
.../phoenix/replication/log/LogFileFormatTest.java | 87 ++++++++++++++++++++++
.../phoenix/replication/log/LogFileWriterTest.java | 10 +++
12 files changed, 252 insertions(+), 125 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogHeaderException.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogHeaderException.java
new file mode 100644
index 0000000000..500b8b003f
--- /dev/null
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogHeaderException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.log;
+
+import java.io.IOException;
+
+/** Exception thrown when a log file has an invalid header. */
+public class InvalidLogHeaderException extends IOException {
+ private static final long serialVersionUID = 1L;
+
+ public InvalidLogHeaderException(String message) {
+ super(message);
+ }
+
+}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogTrailerException.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogTrailerException.java
new file mode 100644
index 0000000000..4b528d1be9
--- /dev/null
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogTrailerException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.log;
+
+import java.io.IOException;
+
+/** Exception thrown when a log file has an invalid trailer. */
+public class InvalidLogTrailerException extends IOException {
+ private static final long serialVersionUID = 1L;
+
+ public InvalidLogTrailerException(String message) {
+ super(message);
+ }
+
+}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java
index 631146c4aa..b1f05f4af9 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java
@@ -22,6 +22,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -418,21 +419,29 @@ public interface LogFile {
/**
* Utility for determining if a file is a valid replication log file.
- * @param fs The FileSystem
- * @param path Path to the potential replication log file
+ * @param conf The Configuration
+ * @param fs The FileSystem
+ * @param path Path to the potential replication log file
+ * @param validateTrailer Whether to validate the trailer
* @return true if the file is a valid replication log file, false otherwise
* @throws IOException if an I/O problem was encountered
*/
- static boolean isValidLogFile(final FileSystem fs, final Path path) throws
IOException {
+ static boolean isValidLogFile(final Configuration conf, final FileSystem fs,
final Path path,
+ final boolean validateTrailer) throws IOException {
long length = fs.getFileStatus(path).getLen();
try (FSDataInputStream in = fs.open(path)) {
- if (LogFileTrailer.isValidTrailer(in, length)) {
- return true;
- } else {
- // Not a valid trailer, do we need to do something (set a flag)?
- // Fall back to checking the header.
- return LogFileHeader.isValidHeader(in);
+ // Check if the file is too short to be a valid log file.
+ if (length < LogFileHeader.HEADERSIZE) {
+ return false;
}
+ try (LogFileFormatReader reader = new LogFileFormatReader()) {
+ LogFileReaderContext context = new
LogFileReaderContext(conf).setFilePath(path)
+ .setFileSize(length).setValidateTrailer(validateTrailer);
+ reader.init(context, (SeekableDataInput) in);
+ } catch (InvalidLogHeaderException | InvalidLogTrailerException e) {
+ return false;
+ }
+ return true;
}
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatReader.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatReader.java
index 2350ba7c7a..6fbdf6d645 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatReader.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatReader.java
@@ -47,7 +47,6 @@ public class LogFileFormatReader implements Closeable {
private ByteBuffer currentBlockBuffer;
private long currentBlockDataBytes;
private long currentBlockConsumedBytes;
- private boolean trailerValidated;
private CRC64 crc = new CRC64();
public LogFileFormatReader() {
@@ -61,13 +60,13 @@ public class LogFileFormatReader implements Closeable {
this.currentBlockConsumedBytes = 0;
try {
readAndValidateTrailer();
- trailerValidated = true;
} catch (IOException e) {
+ // If we are validating the trailer, we cannot proceed without it.
+ if (context.isValidateTrailer()) {
+ throw e;
+ }
// Log warning, trailer might be missing or corrupt, proceed without it
- LOG.warn(
- "Failed to read or validate Log trailer for path: "
- + (context != null ? context.getFilePath() : "unknown") + ".
Proceeding without trailer.",
- e);
+ LOG.warn("Failed to validate Log trailer for " + context.getFilePath() +
", proceeding", e);
trailer = null; // Ensure trailer is null if reading/validation failed
}
this.decoder = null;
@@ -78,8 +77,7 @@ public class LogFileFormatReader implements Closeable {
private void readAndValidateTrailer() throws IOException {
if (context.getFileSize() < LogFileTrailer.FIXED_TRAILER_SIZE) {
- throw new IOException("File size " + context.getFileSize()
- + " is smaller than the fixed trailer size " +
LogFileTrailer.FIXED_TRAILER_SIZE);
+ throw new InvalidLogTrailerException("Short file");
}
LogFileTrailer ourTrailer = new LogFileTrailer();
// Fixed trailer fields will be LogTrailer.FIXED_TRAILER_SIZE bytes back
from end of file.
@@ -337,7 +335,7 @@ public class LogFileFormatReader implements Closeable {
// Validates read counts against trailer counts if trailer was successfully
read
private void validateReadCounts() {
- if (!trailerValidated || trailer == null) {
+ if (trailer == null) {
return;
}
if (trailer.getBlockCount() != context.getBlocksRead()) {
@@ -367,8 +365,7 @@ public class LogFileFormatReader implements Closeable {
return "LogFileFormatReader [context=" + context + ", decoder=" + decoder
+ ", input=" + input
+ ", header=" + header + ", trailer=" + trailer + ", currentPosition=" +
currentPosition
+ ", currentBlockBuffer=" + currentBlockBuffer + ",
currentBlockUncompressedSize="
- + currentBlockDataBytes + ", currentBlockConsumedBytes=" +
currentBlockConsumedBytes
- + ", trailerValidated=" + trailerValidated + "]";
+ + currentBlockDataBytes + ", currentBlockConsumedBytes=" +
currentBlockConsumedBytes + "]";
}
LogFile.Header getHeader() {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
index 30fab50bf4..be94364a80 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
@@ -40,8 +40,6 @@ public class LogFileFormatWriter implements Closeable {
private SyncableDataOutput output;
private ByteArrayOutputStream currentBlockBytes;
private DataOutputStream blockDataStream;
- private boolean headerWritten = false;
- private boolean trailerWritten = false;
private long recordCount = 0;
private long blockCount = 0;
private long blocksStartOffset = -1;
@@ -59,15 +57,14 @@ public class LogFileFormatWriter implements Closeable {
this.currentBlockBytes = new ByteArrayOutputStream();
this.blockDataStream = new DataOutputStream(currentBlockBytes);
this.encoder = context.getCodec().getEncoder(blockDataStream);
+ // Write header immediately when file is created
+ writeFileHeader();
}
private void writeFileHeader() throws IOException {
- if (!headerWritten) {
- LogFileHeader header = new LogFileHeader();
- header.write(output);
- blocksStartOffset = output.getPos(); // First block starts after header
- headerWritten = true;
- }
+ LogFileHeader header = new LogFileHeader();
+ header.write(output);
+ blocksStartOffset = output.getPos(); // First block starts after header
}
public long getBlocksStartOffset() {
@@ -75,13 +72,6 @@ public class LogFileFormatWriter implements Closeable {
}
public void append(LogFile.Record record) throws IOException {
- if (!headerWritten) {
- // Lazily write file header
- writeFileHeader();
- }
- if (trailerWritten) {
- throw new IOException("Cannot append record after trailer has been
written");
- }
if (blockDataStream == null) {
startBlock(); // Start the block if needed
}
@@ -185,15 +175,10 @@ public class LogFileFormatWriter implements Closeable {
@Override
public void close() throws IOException {
- // We use the fact we have already written the trailer as the boolean
"closed" condition.
- if (trailerWritten) {
+ if (output == null) {
return;
}
try {
- // We might be closing an empty file, handle this case correctly.
- if (!headerWritten) {
- writeFileHeader();
- }
// Close any outstanding block.
closeBlock();
// After we write the trailer we consider the file closed.
@@ -206,6 +191,7 @@ public class LogFileFormatWriter implements Closeable {
} catch (IOException e) {
LOG.error("Exception while closing LogFormatWriter", e);
}
+ output = null;
}
}
}
@@ -215,7 +201,6 @@ public class LogFileFormatWriter implements Closeable {
new
LogFileTrailer().setRecordCount(recordCount).setBlockCount(blockCount)
.setBlocksStartOffset(blocksStartOffset).setTrailerStartOffset(output.getPos());
trailer.write(output);
- trailerWritten = true;
try {
output.sync();
} catch (IOException e) {
@@ -227,8 +212,7 @@ public class LogFileFormatWriter implements Closeable {
@Override
public String toString() {
return "LogFileFormatWriter [writerContext=" + context + ",
currentBlockUncompressedBytes="
- + currentBlockBytes + ", headerWritten=" + headerWritten + ",
trailerWritten="
- + trailerWritten + ", recordCount=" + recordCount + ", blockCount=" +
blockCount
+ + currentBlockBytes + ", recordCount=" + recordCount + ", blockCount=" +
blockCount
+ ", blocksStartOffset=" + blocksStartOffset + "]";
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileHeader.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileHeader.java
index c15260b3a5..fdaced70d3 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileHeader.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileHeader.java
@@ -19,11 +19,9 @@ package org.apache.phoenix.replication.log;
import java.io.DataInput;
import java.io.DataOutput;
+import java.io.EOFException;
import java.io.IOException;
import java.util.Arrays;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.util.Bytes;
public class LogFileHeader implements LogFile.Header {
@@ -35,7 +33,7 @@ public class LogFileHeader implements LogFile.Header {
/** Current minor version of the replication log format */
static final int VERSION_MINOR = 0;
- static final int HEADERSIZE = MAGIC.length + 3 * Bytes.SIZEOF_BYTE;
+ static final int HEADERSIZE = MAGIC.length + 2 * Bytes.SIZEOF_BYTE;
private int majorVersion = VERSION_MAJOR;
private int minorVersion = VERSION_MINOR;
@@ -69,18 +67,27 @@ public class LogFileHeader implements LogFile.Header {
@Override
public void readFields(DataInput in) throws IOException {
byte[] magic = new byte[MAGIC.length];
- in.readFully(magic);
+ try {
+ in.readFully(magic);
+ } catch (EOFException e) {
+ throw (IOException) new InvalidLogHeaderException("Short
magic").initCause(e);
+ }
if (!Arrays.equals(MAGIC, magic)) {
- throw new IOException("Invalid LogFile magic. Got " +
Bytes.toStringBinary(magic)
+ throw new InvalidLogHeaderException("Bad magic. Got " +
Bytes.toStringBinary(magic)
+ ", expected " + Bytes.toStringBinary(MAGIC));
}
- majorVersion = in.readByte();
- minorVersion = in.readByte();
+ try {
+ majorVersion = in.readByte();
+ minorVersion = in.readByte();
+ } catch (EOFException e) {
+ throw (IOException) new InvalidLogHeaderException("Short
version").initCause(e);
+ }
// Basic version check for now. We assume semver conventions where only
higher major
// versions may be incompatible.
if (majorVersion > VERSION_MAJOR) {
- throw new IOException("Unsupported LogFile version. Got major=" +
majorVersion + " minor="
- + minorVersion + ", expected major=" + VERSION_MAJOR + " minor=" +
VERSION_MINOR);
+ throw new InvalidLogHeaderException(
+ "Unsupported version. Got major=" + majorVersion + " minor=" +
minorVersion
+ + ", expected major=" + VERSION_MAJOR + " minor=" + VERSION_MINOR);
}
}
@@ -96,32 +103,6 @@ public class LogFileHeader implements LogFile.Header {
return HEADERSIZE;
}
- public static boolean isValidHeader(final FileSystem fs, final Path path)
throws IOException {
- if (fs.getFileStatus(path).getLen() < HEADERSIZE) {
- return false;
- }
- try (FSDataInputStream in = fs.open(path)) {
- return isValidHeader(in);
- }
- }
-
- public static boolean isValidHeader(FSDataInputStream in) throws IOException
{
- in.seek(0);
- byte[] magic = new byte[MAGIC.length];
- in.readFully(magic);
- if (!Arrays.equals(MAGIC, magic)) {
- return false;
- }
- int majorVersion = in.readByte();
- in.readByte(); // minorVersion, for now we don't use it
- // Basic version check for now. We assume semver conventions where only
higher major
- // versions may be incompatible.
- if (majorVersion > VERSION_MAJOR) {
- return false;
- }
- return true;
- }
-
@Override
public String toString() {
return "LogFileHeader [majorVersion=" + majorVersion + ", minorVersion=" +
minorVersion + "]";
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReader.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReader.java
index 4e4bd4b67b..8b24f6c0c1 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReader.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReader.java
@@ -134,7 +134,8 @@ public class LogFileReader implements LogFile.Reader {
throw e;
} finally {
closed = true;
- LOG.debug("Closed LogFileReader for path {}", context.getFilePath());
+ LOG.debug("Closed LogFileReader for path {}",
+ context != null ? context.getFilePath() : "null");
}
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReaderContext.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReaderContext.java
index 5573df7599..8fbab2e8da 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReaderContext.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReaderContext.java
@@ -35,12 +35,17 @@ public class LogFileReaderContext {
/** Default for skipping corrupt blocks */
public static final boolean DEFAULT_LOGFILE_SKIP_CORRUPT_BLOCKS = true;
+ public static final String LOGFILE_VALIDATE_TRAILER =
+ "phoenix.replication.logfile.validate.trailer";
+ public static final boolean DEFAULT_LOGFILE_VALIDATE_TRAILER = true;
+
private final Configuration conf;
private FileSystem fs;
private Path path;
private LogFileCodec codec;
private long fileSize = -1;
private boolean isSkipCorruptBlocks;
+ private boolean isValidateTrailer;
private long blocksRead;
private long recordsRead;
private long corruptBlocksSkipped;
@@ -49,6 +54,8 @@ public class LogFileReaderContext {
this.conf = conf;
this.isSkipCorruptBlocks =
conf.getBoolean(LOGFILE_SKIP_CORRUPT_BLOCKS,
DEFAULT_LOGFILE_SKIP_CORRUPT_BLOCKS);
+ this.isValidateTrailer =
+ conf.getBoolean(LOGFILE_VALIDATE_TRAILER,
DEFAULT_LOGFILE_VALIDATE_TRAILER);
// Note: When we have multiple codec types, instantiate the appropriate
type based on
// configuration;
this.codec = new LogFileCodec();
@@ -145,12 +152,21 @@ public class LogFileReaderContext {
return this;
}
+ public boolean isValidateTrailer() {
+ return isValidateTrailer;
+ }
+
+ public LogFileReaderContext setValidateTrailer(boolean validateTrailer) {
+ this.isValidateTrailer = validateTrailer;
+ return this;
+ }
+
@Override
public String toString() {
return "LogFileReaderContext [filePath=" + path + ", fileSize=" + fileSize
- + ", isSkipCorruptBlocks=" + isSkipCorruptBlocks + ", codec=" + codec +
", blocksRead="
- + blocksRead + ", recordsRead=" + recordsRead + ", corruptBlocksSkipped="
- + corruptBlocksSkipped + "]";
+ + ", isSkipCorruptBlocks=" + isSkipCorruptBlocks + ",
isValidateTrailer=" + isValidateTrailer
+ + ", codec=" + codec + ", blocksRead=" + blocksRead + ", recordsRead=" +
recordsRead
+ + ", corruptBlocksSkipped=" + corruptBlocksSkipped + "]";
}
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileTrailer.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileTrailer.java
index 84433e0309..2f77d498d8 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileTrailer.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileTrailer.java
@@ -19,11 +19,9 @@ package org.apache.phoenix.replication.log;
import java.io.DataInput;
import java.io.DataOutput;
+import java.io.EOFException;
import java.io.IOException;
import java.util.Arrays;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.util.Bytes;
public class LogFileTrailer implements LogFile.Trailer {
@@ -112,23 +110,31 @@ public class LogFileTrailer implements LogFile.Trailer {
}
public void readFixedFields(DataInput in) throws IOException {
- this.recordCount = in.readLong();
- this.blockCount = in.readLong();
- this.blocksStartOffset = in.readLong();
- this.trailerStartOffset = in.readLong();
- this.majorVersion = in.readByte();
- this.minorVersion = in.readByte();
+ try {
+ this.recordCount = in.readLong();
+ this.blockCount = in.readLong();
+ this.blocksStartOffset = in.readLong();
+ this.trailerStartOffset = in.readLong();
+ this.majorVersion = in.readByte();
+ this.minorVersion = in.readByte();
+ } catch (EOFException e) {
+ throw (IOException) new InvalidLogTrailerException("Short fixed
fields").initCause(e);
+ }
// Basic version check for now. We assume semver conventions where only
higher major
// versions may be incompatible.
if (majorVersion > LogFileHeader.VERSION_MAJOR) {
- throw new IOException("Unsupported LogFile version. Got major=" +
majorVersion + " minor="
- + minorVersion + ", expected major=" + LogFileHeader.VERSION_MAJOR + "
minor="
+ throw new InvalidLogTrailerException("Unsupported version. Got major=" +
majorVersion
+ + " minor=" + minorVersion + ", expected major=" +
LogFileHeader.VERSION_MAJOR + " minor="
+ LogFileHeader.VERSION_MINOR);
}
byte[] magic = new byte[LogFileHeader.MAGIC.length];
- in.readFully(magic);
+ try {
+ in.readFully(magic);
+ } catch (EOFException e) {
+ throw (IOException) new InvalidLogTrailerException("Short
magic").initCause(e);
+ }
if (!Arrays.equals(LogFileHeader.MAGIC, magic)) {
- throw new IOException("Invalid LogFile magic. Got " +
Bytes.toStringBinary(magic)
+ throw new InvalidLogTrailerException("Bad magic. Got " +
Bytes.toStringBinary(magic)
+ ", expected " + Bytes.toStringBinary(LogFileHeader.MAGIC));
}
}
@@ -171,33 +177,6 @@ public class LogFileTrailer implements LogFile.Trailer {
+ FIXED_TRAILER_SIZE;
}
- public static boolean isValidTrailer(final FileSystem fs, final Path path)
throws IOException {
- try (FSDataInputStream in = fs.open(path)) {
- return isValidTrailer(in, fs.getFileStatus(path).getLen());
- }
- }
-
- public static boolean isValidTrailer(FSDataInputStream in, long length)
throws IOException {
- long offset = length - VERSION_AND_MAGIC_SIZE;
- if (offset < 0) {
- return false;
- }
- in.seek(offset);
- byte[] magic = new byte[LogFileHeader.MAGIC.length];
- in.readFully(magic);
- if (!Arrays.equals(LogFileHeader.MAGIC, magic)) {
- return false;
- }
- int majorVersion = in.readByte();
- in.readByte(); // minorVersion, for now we don't use it
- // Basic version check for now. We assume semver conventions where only
higher major
- // versions may be incompatible.
- if (majorVersion > LogFileHeader.VERSION_MAJOR) {
- return false;
- }
- return true;
- }
-
@Override
public String toString() {
return "LogFileTrailer [majorVersion=" + majorVersion + ", minorVersion="
+ minorVersion
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
index ca3c4cfe97..062bef68bd 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
@@ -553,8 +553,11 @@ public class ReplicationLogProcessorTestIT extends
ParallelStatsDisabledIT {
writer.append(tableNameString, 1, put);
writer.sync();
+ // For processing of an unclosed file to work, we need to disable trailer
validation
+ Configuration testConf = new Configuration(conf);
+ testConf.setBoolean(LogFileReaderContext.LOGFILE_VALIDATE_TRAILER, false);
ReplicationLogProcessor spyProcessor =
- Mockito.spy(new ReplicationLogProcessor(conf, testHAGroupName));
+ Mockito.spy(new ReplicationLogProcessor(testConf, testHAGroupName));
// Create argument captor to capture the actual parameters passed to
processReplicationLogBatch
ArgumentCaptor<Map<TableName, List<Mutation>>> mapCaptor =
ArgumentCaptor.forClass(Map.class);
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java
index 885f1e35a8..0d473cfd65 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java
@@ -294,6 +294,9 @@ public class LogFileFormatTest {
byte[] data = writerBaos.toByteArray();
byte[] truncatedData = Arrays.copyOf(data, (int) truncationPoint);
+ // We truncated the final block, so the trailer is gone too.
+ readerContext.setValidateTrailer(false);
+
initLogFileReader(truncatedData);
List<LogFile.Record> decoded = new ArrayList<>();
@@ -339,6 +342,7 @@ public class LogFileFormatTest {
LogFileTestUtil.SeekableByteArrayInputStream input =
new LogFileTestUtil.SeekableByteArrayInputStream(truncatedData);
readerContext.setFileSize(truncatedData.length);
+ readerContext.setValidateTrailer(false);
// This init should log a warning but succeed
reader.init(readerContext, input);
@@ -373,6 +377,7 @@ public class LogFileFormatTest {
LogFileTestUtil.SeekableByteArrayInputStream input =
new LogFileTestUtil.SeekableByteArrayInputStream(truncatedData);
readerContext.setFileSize(truncatedData.length);
+ readerContext.setValidateTrailer(false);
// Init should log a warning but succeed by ignoring the trailer
reader.init(readerContext, input);
@@ -401,6 +406,88 @@ public class LogFileFormatTest {
assertEquals("Records read count mismatch", totalRecords,
readerContext.getRecordsRead());
}
+ @Test
+ public void testFailIfMissingHeader() throws IOException {
+ // Zero length file
+ byte[] data = new byte[0];
+ LogFileTestUtil.SeekableByteArrayInputStream input =
+ new LogFileTestUtil.SeekableByteArrayInputStream(data);
+ readerContext.setFileSize(data.length);
+ readerContext.setValidateTrailer(false);
+ try {
+ reader.init(readerContext, input);
+ fail("Expected InvalidLogHeaderException for zero length file");
+ } catch (InvalidLogHeaderException e) {
+ assertTrue("Exception message should contain 'Short magic'",
+ e.getMessage().contains("Short magic"));
+ }
+ }
+
+ @Test
+ public void testFailIfInvalidHeader() throws IOException {
+ initLogFileWriter();
+ writer.close(); // Writes valid trailer
+ byte[] data = writerBaos.toByteArray();
+ LogFileTestUtil.SeekableByteArrayInputStream input =
+ new LogFileTestUtil.SeekableByteArrayInputStream(data);
+ readerContext.setFileSize(data.length);
+ readerContext.setValidateTrailer(true);
+ data[0] = (byte) 'X'; // Corrupt the first magic byte
+ try {
+ reader.init(readerContext, input);
+ fail("Expected InvalidLogHeaderException for file with corrupted header
magic");
+ } catch (InvalidLogHeaderException e) {
+ assertTrue("Exception message should contain 'Bad magic'",
+ e.getMessage().contains("Bad magic"));
+ }
+ }
+
+ @Test
+ public void testFailIfMissingTrailer() throws IOException {
+ initLogFileWriter();
+ writeBlock(writer, "B1", 0, 5);
+ // Don't close the writer, simulate missing trailer
+ byte[] data = writerBaos.toByteArray();
+ // Re-initialize reader with truncated data and trailer validation enabled
+ LogFileTestUtil.SeekableByteArrayInputStream input =
+ new LogFileTestUtil.SeekableByteArrayInputStream(data);
+ readerContext.setFileSize(data.length);
+ // Enable trailer validation
+ readerContext.setValidateTrailer(true);
+ try {
+ reader.init(readerContext, input);
+ fail("Expected InvalidLogTrailerException when trailer is missing");
+ } catch (InvalidLogTrailerException e) {
+ assertTrue("Exception message should contain 'Unsupported version'",
+ e.getMessage().contains("Unsupported version"));
+ }
+ }
+
+ @Test
+ public void testFailIfInvalidTrailer() throws IOException {
+ initLogFileWriter();
+ writeBlock(writer, "B1", 0, 5);
+ writer.close(); // Writes valid trailer
+ byte[] data = writerBaos.toByteArray();
+ // Corrupt the trailer by changing the magic bytes
+ int trailerStartOffset = data.length - LogFileTrailer.FIXED_TRAILER_SIZE;
+ int magicOffset =
+ trailerStartOffset + LogFileTrailer.FIXED_TRAILER_SIZE -
LogFileHeader.MAGIC.length;
+ data[magicOffset] = (byte) 'X'; // Corrupt the first magic byte
+ // Re-initialize reader with corrupted trailer and trailer validation
enabled
+ LogFileTestUtil.SeekableByteArrayInputStream input =
+ new LogFileTestUtil.SeekableByteArrayInputStream(data);
+ readerContext.setFileSize(data.length);
+ readerContext.setValidateTrailer(true);
+ try {
+ reader.init(readerContext, input);
+ fail("Expected InvalidLogTrailerException when trailer magic is
corrupt");
+ } catch (InvalidLogTrailerException e) {
+ assertTrue("Exception message should contain 'Bad magic'",
+ e.getMessage().contains("Bad magic"));
+ }
+ }
+
@Test
public void testLogFileCorruptionFirstBlockChecksum() throws IOException {
initLogFileWriter();
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterTest.java
index 36dde2fc83..95c6bb5133 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterTest.java
@@ -137,6 +137,16 @@ public class LogFileWriterTest {
reader.close();
}
+ @Test
+ public void testHeaderWrittenImmediately() throws IOException {
+ // This should write header immediately
+ initLogFileWriter();
+ // Verify file exists and has content (header should be written)
+ assertTrue("File should exist after init", localFs.exists(filePath));
+ assertEquals("File should have header written", LogFileHeader.HEADERSIZE,
writer.getLength());
+ writer.close();
+ }
+
private void initLogFileReader() throws IOException {
readerContext = new
LogFileReaderContext(conf).setFileSystem(localFs).setFilePath(filePath);
reader.init(readerContext);