wuchong commented on code in PR #1749: URL: https://github.com/apache/fluss/pull/1749#discussion_r2490431910
########## fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java: ########## @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.log; + +import org.apache.fluss.compression.ArrowCompressionInfo; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.MemorySize; +import org.apache.fluss.metadata.LogFormat; +import org.apache.fluss.metadata.PhysicalTablePath; +import org.apache.fluss.record.ChangeType; +import org.apache.fluss.record.LogTestBase; +import org.apache.fluss.record.MemoryLogRecords; +import org.apache.fluss.server.exception.CorruptIndexException; +import org.apache.fluss.server.metrics.group.TestingMetricGroups; +import org.apache.fluss.utils.clock.Clock; +import org.apache.fluss.utils.clock.ManualClock; +import org.apache.fluss.utils.clock.SystemClock; +import org.apache.fluss.utils.concurrent.FlussScheduler; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE; +import static org.apache.fluss.record.TestData.DATA1_TABLE_ID; +import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH; +import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID; +import static org.apache.fluss.testutils.DataTestUtils.createBasicMemoryLogRecords; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link LogLoader}. */ +final class LogLoaderTest extends LogTestBase { + + private @TempDir File tempDir; + private FlussScheduler scheduler; + private File logDir; + private Clock clock; + + @BeforeEach + public void setup() throws Exception { + conf.set(ConfigOptions.LOG_SEGMENT_FILE_SIZE, MemorySize.parse("10kb")); + conf.set(ConfigOptions.LOG_INDEX_INTERVAL_SIZE, MemorySize.parse("1b")); + + logDir = + LogTestUtils.makeRandomLogTabletDir( + tempDir, + DATA1_TABLE_PATH.getDatabaseName(), + DATA1_TABLE_ID, + DATA1_TABLE_PATH.getTableName()); + + scheduler = new FlussScheduler(1); + scheduler.startup(); + + clock = new ManualClock(); + } + + // TODO: add more tests like Kafka LogLoaderTest + + @Test + void testCorruptIndexRebuild() throws Exception { + // publish the records and close the log + int numRecords = 200; + LogTablet logTablet = createLogTablet(true); + appendRecords(logTablet, numRecords); + // collect all the index files + List<File> indexFiles = collectIndexFiles(logTablet.logSegments()); + logTablet.close(); + + // corrupt all the index files + for (File indexFile : indexFiles) { + try (FileChannel fileChannel = + FileChannel.open(indexFile.toPath(), StandardOpenOption.APPEND)) { + for (int i = 0; i < 12; i++) { + fileChannel.write(ByteBuffer.wrap(new byte[] {0})); + } + } + } + + // test reopen the log without recovery, sanity check of index files should throw exception + logTablet = createLogTablet(true); + for (LogSegment segment : logTablet.logSegments()) { + if (segment.getBaseOffset() != logTablet.activeLogSegment().getBaseOffset()) { + assertThatThrownBy(segment.offsetIndex()::sanityCheck) + .isInstanceOf(CorruptIndexException.class) + .hasMessage( + String.format( + "Index file %s is corrupt, found %d bytes which is neither positive nor a multiple of %d", + segment.offsetIndex().file().getAbsolutePath(), + segment.offsetIndex().length(), + segment.offsetIndex().entrySize())); + assertThatThrownBy(segment.timeIndex()::sanityCheck) + .isInstanceOf(CorruptIndexException.class) + .hasMessageContaining( + String.format( + "Corrupt time index found, time index file (%s) has non-zero size but the last timestamp is 0 which is less than the first timestamp", + segment.timeIndex().file().getAbsolutePath())); + } else { + // the offset index file of active segment will be resized, which case no corruption + // exception when doing sanity check + segment.offsetIndex().sanityCheck(); + assertThatThrownBy(segment.timeIndex()::sanityCheck) + .isInstanceOf(CorruptIndexException.class) + .hasMessageContaining( + String.format( + "Corrupt time index found, time index file (%s) has non-zero size but the last timestamp is 0 which is less than the first timestamp", + segment.timeIndex().file().getAbsolutePath())); + } + } + logTablet.close(); + + // test reopen the log with recovery, sanity check of index files should no corruption + logTablet = createLogTablet(false); + for (LogSegment segment : logTablet.logSegments()) { + segment.offsetIndex().sanityCheck(); + segment.timeIndex().sanityCheck(); + } + assertThat(numRecords).isEqualTo(logTablet.localLogEndOffset()); + for (int i = 0; i < numRecords; i++) { + assertThat(i) + .isEqualTo(logTablet.lookupOffsetForTimestamp(clock.milliseconds() + i * 10)); + } + logTablet.close(); + } + + @Test + void testCorruptIndexRebuildWithRecoveryPoint() throws Exception { + // publish the records and close the log + int numRecords = 200; + LogTablet logTablet = createLogTablet(true); + appendRecords(logTablet, numRecords); + // collect all the index files + long recoveryPoint = logTablet.localLogEndOffset() / 2; + List<File> indexFiles = collectIndexFiles(logTablet.logSegments()); + logTablet.close(); + + // corrupt all the index files + for (File indexFile : indexFiles) { + try (FileChannel fileChannel = + FileChannel.open(indexFile.toPath(), StandardOpenOption.APPEND)) { + for (int i = 0; i < 12; i++) { + fileChannel.write(ByteBuffer.wrap(new byte[] {0})); + } + } + } + + // test reopen the log with recovery point + logTablet = createLogTablet(false, recoveryPoint); + List<LogSegment> logSegments = logTablet.logSegments(recoveryPoint, Long.MAX_VALUE); + assertThat(logSegments.size() < logTablet.logSegments().size()).isTrue(); + Set<Long> recoveredSegments = + logSegments.stream().map(LogSegment::getBaseOffset).collect(Collectors.toSet()); + for (LogSegment segment : logTablet.logSegments()) { + if (recoveredSegments.contains(segment.getBaseOffset())) { + segment.offsetIndex().sanityCheck(); + segment.timeIndex().sanityCheck(); + } else { + // the segments before recovery point will not be recovered, so sanity check should + // still throw corrupt exception + assertThatThrownBy(segment.offsetIndex()::sanityCheck) + .isInstanceOf(CorruptIndexException.class) + .hasMessage( + String.format( + "Index file %s is corrupt, found %d bytes which is neither positive nor a multiple of %d", + segment.offsetIndex().file().getAbsolutePath(), + segment.offsetIndex().length(), + segment.offsetIndex().entrySize())); + assertThatThrownBy(segment.timeIndex()::sanityCheck) + .isInstanceOf(CorruptIndexException.class) + .hasMessageContaining( + String.format( + "Corrupt time index found, time index file (%s) has non-zero size but the last timestamp is 0 which is less than the first timestamp", + segment.timeIndex().file().getAbsolutePath())); + } + } + } + + @Test + void testIndexRebuild() throws Exception { + // publish the records and close the log + int numRecords = 200; + LogTablet logTablet = createLogTablet(true); + appendRecords(logTablet, numRecords); + // collect all index files + List<File> indexFiles = collectIndexFiles(logTablet.logSegments()); + logTablet.close(); + + // delete all the index files + indexFiles.forEach(File::delete); + + // reopen the log + logTablet = createLogTablet(false); + assertThat(logTablet.localLogEndOffset()).isEqualTo(numRecords); + // the index files should be rebuilt + assertThat(logTablet.logSegments().get(0).offsetIndex().entries() > 0).isTrue(); + assertThat(logTablet.logSegments().get(0).timeIndex().entries() > 0).isTrue(); + for (int i = 0; i < numRecords; i++) { + assertThat(i) + .isEqualTo(logTablet.lookupOffsetForTimestamp(clock.milliseconds() + i * 10)); Review Comment: ditto ########## fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java: ########## @@ -155,8 +281,26 @@ private void loadSegmentFiles() throws IOException { } } else if (LocalLog.isLogFile(file)) { long baseOffset = FlussPaths.offsetFromFile(file); + boolean timeIndexFileNewlyCreated = + !FlussPaths.timeIndexFile(logTabletDir, baseOffset).exists(); LogSegment segment = LogSegment.open(logTabletDir, baseOffset, conf, true, 0, logFormat); + + try { + segment.sanityCheck(timeIndexFileNewlyCreated); + } catch (Exception e) { + if (e instanceof NoSuchFieldException) { + if (isCleanShutdown + || segment.getBaseOffset() < recoveryPointCheckpoint) { + LOG.error( + "Could not find offset index file corresponding to log file {} " + + "for bucket {}, recovering segment and rebuilding index files...", + logSegments.getTableBucket(), + segment.getFileLogRecords().file().getAbsoluteFile()); + } + recoverSegment(segment); + } Review Comment: Should we catch `CorruptIndexException` and recover segment for this case like how kafka does? And we should rethrow other exceptions in else branch. ########## fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java: ########## @@ -129,16 +165,106 @@ public LoadedLogOffsets load() throws IOException { * overflow */ private Tuple2<Long, Long> recoverLog() throws IOException { - // TODO truncate log to recover maybe unflush segments. + if (!isCleanShutdown) { + List<LogSegment> unflushed = + logSegments.values(recoveryPointCheckpoint, Long.MAX_VALUE); + int numUnflushed = unflushed.size(); + Iterator<LogSegment> unflushedIter = unflushed.iterator(); + boolean truncated = false; + int numFlushed = 1; + + while (unflushedIter.hasNext() && !truncated) { + LogSegment segment = unflushedIter.next(); + LOG.info( + "Recovering unflushed segment {}. {}/{} recovered for bucket {}", + segment.getBaseOffset(), + numFlushed, + numUnflushed, + logSegments.getTableBucket()); + + int truncatedBytes = -1; + try { + truncatedBytes = recoverSegment(segment); + } catch (Exception e) { + if (e instanceof InvalidOffsetException) { + long startOffset = segment.getBaseOffset(); + LOG.warn( + "Found invalid offset during recovery for bucket {}. Deleting the corrupt segment " + + "and creating an empty one with starting offset {}", + logSegments.getTableBucket(), + startOffset); + truncatedBytes = segment.truncateTo(startOffset); + } Review Comment: We should rethrow the exception in the else branch. Otherwise, the exception is swallowed. ########## fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java: ########## @@ -117,6 +122,37 @@ public LoadedLogOffsets load() throws IOException { nextOffset, activeSegment.getBaseOffset(), activeSegment.getSizeInBytes())); } + /** + * Just recovers the given segment, without adding it to the provided segments. + * + * @param segment Segment to recover + * @return The number of bytes truncated from the segment + * @throws LogSegmentOffsetOverflowException if the segment contains messages that cause index + * offset overflow + */ + private int recoverSegment(LogSegment segment) throws Exception { + WriterStateManager writerStateManager = + new WriterStateManager( + logSegments.getTableBucket(), + logTabletDir, + this.writerStateManager.writerExpirationMs()); + // TODO, Here, we use 0 as the logStartOffset passed into rebuildWriterState. The reason is + // that the current implementation of logStartOffset in Fluss is not yet fully refined, and + // there may be cases where logStartOffset is not updated. As a result, logStartOffset is + // not yet reliable. Once the issue with correctly updating logStartOffset is resolved in + // issue https://github.com/apache/fluss/issues/744, we can use logStartOffset here. + // Additionally, using 0 versus using logStartOffset does not affect correctness—they both + // can restore the complete WriterState. The only difference is that using logStartOffset + // can potentially skip over more segments. + LogTablet.rebuildWriterState( + writerStateManager, logSegments, 0, segment.getBaseOffset(), false); + int bytesTruncated = segment.recover(); Review Comment: Do we need to pass `writerStateManager` into `segment.recover()` to update writer state like how kafka does? cc @swuferhong ########## fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java: ########## @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.log; + +import org.apache.fluss.compression.ArrowCompressionInfo; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.MemorySize; +import org.apache.fluss.metadata.LogFormat; +import org.apache.fluss.metadata.PhysicalTablePath; +import org.apache.fluss.record.ChangeType; +import org.apache.fluss.record.LogTestBase; +import org.apache.fluss.record.MemoryLogRecords; +import org.apache.fluss.server.exception.CorruptIndexException; +import org.apache.fluss.server.metrics.group.TestingMetricGroups; +import org.apache.fluss.utils.clock.Clock; +import org.apache.fluss.utils.clock.ManualClock; +import org.apache.fluss.utils.clock.SystemClock; +import org.apache.fluss.utils.concurrent.FlussScheduler; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE; +import static org.apache.fluss.record.TestData.DATA1_TABLE_ID; +import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH; +import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID; +import static org.apache.fluss.testutils.DataTestUtils.createBasicMemoryLogRecords; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link LogLoader}. */ +final class LogLoaderTest extends LogTestBase { + + private @TempDir File tempDir; + private FlussScheduler scheduler; + private File logDir; + private Clock clock; + + @BeforeEach + public void setup() throws Exception { + conf.set(ConfigOptions.LOG_SEGMENT_FILE_SIZE, MemorySize.parse("10kb")); + conf.set(ConfigOptions.LOG_INDEX_INTERVAL_SIZE, MemorySize.parse("1b")); + + logDir = + LogTestUtils.makeRandomLogTabletDir( + tempDir, + DATA1_TABLE_PATH.getDatabaseName(), + DATA1_TABLE_ID, + DATA1_TABLE_PATH.getTableName()); + + scheduler = new FlussScheduler(1); + scheduler.startup(); + + clock = new ManualClock(); + } + + // TODO: add more tests like Kafka LogLoaderTest + + @Test + void testCorruptIndexRebuild() throws Exception { + // publish the records and close the log + int numRecords = 200; + LogTablet logTablet = createLogTablet(true); + appendRecords(logTablet, numRecords); + // collect all the index files + List<File> indexFiles = collectIndexFiles(logTablet.logSegments()); + logTablet.close(); + + // corrupt all the index files + for (File indexFile : indexFiles) { + try (FileChannel fileChannel = + FileChannel.open(indexFile.toPath(), StandardOpenOption.APPEND)) { + for (int i = 0; i < 12; i++) { + fileChannel.write(ByteBuffer.wrap(new byte[] {0})); + } + } + } + + // test reopen the log without recovery, sanity check of index files should throw exception + logTablet = createLogTablet(true); + for (LogSegment segment : logTablet.logSegments()) { + if (segment.getBaseOffset() != logTablet.activeLogSegment().getBaseOffset()) { + assertThatThrownBy(segment.offsetIndex()::sanityCheck) + .isInstanceOf(CorruptIndexException.class) + .hasMessage( + String.format( + "Index file %s is corrupt, found %d bytes which is neither positive nor a multiple of %d", + segment.offsetIndex().file().getAbsolutePath(), + segment.offsetIndex().length(), + segment.offsetIndex().entrySize())); + assertThatThrownBy(segment.timeIndex()::sanityCheck) + .isInstanceOf(CorruptIndexException.class) + .hasMessageContaining( + String.format( + "Corrupt time index found, time index file (%s) has non-zero size but the last timestamp is 0 which is less than the first timestamp", + segment.timeIndex().file().getAbsolutePath())); + } else { + // the offset index file of active segment will be resized, which case no corruption + // exception when doing sanity check + segment.offsetIndex().sanityCheck(); + assertThatThrownBy(segment.timeIndex()::sanityCheck) + .isInstanceOf(CorruptIndexException.class) + .hasMessageContaining( + String.format( + "Corrupt time index found, time index file (%s) has non-zero size but the last timestamp is 0 which is less than the first timestamp", + segment.timeIndex().file().getAbsolutePath())); + } + } + logTablet.close(); + + // test reopen the log with recovery, sanity check of index files should no corruption + logTablet = createLogTablet(false); + for (LogSegment segment : logTablet.logSegments()) { + segment.offsetIndex().sanityCheck(); + segment.timeIndex().sanityCheck(); + } + assertThat(numRecords).isEqualTo(logTablet.localLogEndOffset()); + for (int i = 0; i < numRecords; i++) { + assertThat(i) + .isEqualTo(logTablet.lookupOffsetForTimestamp(clock.milliseconds() + i * 10)); Review Comment: ``` assertThat(logTablet.lookupOffsetForTimestamp(clock.milliseconds() + i * 10)) .isEqualTo(i); ``` The actual value should in `assertThat(..)` ########## fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java: ########## @@ -172,6 +172,23 @@ public void resizeIndexes(int size) throws IOException { timeIndex().resize(size); } + public void sanityCheck(boolean timeIndexFileNewlyCreated) throws Exception { + if (lazyOffsetIndex.file().exists()) { + // Resize the time index file to 0 if it is newly created. + if (timeIndexFileNewlyCreated) { + timeIndex().resize(0); + } + // Sanity checks for time index and offset index are skipped because + // we will recover the segments above the recovery point in recoverLog() + // in any case so sanity checking them here is redundant. + } else { + throw new NoSuchFieldException( Review Comment: should throw `NoSuchFileException` ########## fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java: ########## @@ -129,16 +165,106 @@ public LoadedLogOffsets load() throws IOException { * overflow */ private Tuple2<Long, Long> recoverLog() throws IOException { - // TODO truncate log to recover maybe unflush segments. + if (!isCleanShutdown) { + List<LogSegment> unflushed = + logSegments.values(recoveryPointCheckpoint, Long.MAX_VALUE); + int numUnflushed = unflushed.size(); + Iterator<LogSegment> unflushedIter = unflushed.iterator(); + boolean truncated = false; + int numFlushed = 1; + + while (unflushedIter.hasNext() && !truncated) { + LogSegment segment = unflushedIter.next(); + LOG.info( + "Recovering unflushed segment {}. {}/{} recovered for bucket {}", + segment.getBaseOffset(), + numFlushed, + numUnflushed, + logSegments.getTableBucket()); + + int truncatedBytes = -1; + try { + truncatedBytes = recoverSegment(segment); + } catch (Exception e) { + if (e instanceof InvalidOffsetException) { + long startOffset = segment.getBaseOffset(); + LOG.warn( + "Found invalid offset during recovery for bucket {}. Deleting the corrupt segment " + + "and creating an empty one with starting offset {}", + logSegments.getTableBucket(), + startOffset); + truncatedBytes = segment.truncateTo(startOffset); + } + } + + if (truncatedBytes > 0) { + // we had an invalid message, delete all remaining log + LOG.warn( + "Corruption found in segment {} for bucket {}, truncating to offset {}", + segment.getBaseOffset(), + logSegments.getTableBucket(), + segment.readNextOffset()); + removeAndDeleteSegments(unflushedIter); + truncated = true; + } else { + numFlushed += 1; + } + } + } + if (logSegments.isEmpty()) { + // TODO: use logStartOffset if issue https://github.com/apache/fluss/issues/744 ready logSegments.add(LogSegment.open(logTabletDir, 0L, conf, logFormat)); } long logEndOffset = logSegments.lastSegment().get().readNextOffset(); return Tuple2.of(recoveryPointCheckpoint, logEndOffset); } + /** + * This method deletes the given log segments and the associated writer snapshots. + * + * <p>This method does not need to convert IOException to {@link LogStorageException} because it + * is either called before all logs are loaded or the immediate caller will catch and handle + * IOException + * + * @param segmentsToDelete The log segments to schedule for deletion + */ + private void removeAndDeleteSegments(Iterator<LogSegment> segmentsToDelete) { + if (segmentsToDelete.hasNext()) { + List<LogSegment> toDelete = new ArrayList<>(); + segmentsToDelete.forEachRemaining(toDelete::add); + + LOG.info( + "Deleting segments for bucket {} as part of log recovery: {}", + logSegments.getTableBucket(), + toDelete.stream().map(LogSegment::toString).collect(Collectors.joining(","))); + toDelete.forEach(segment -> logSegments.remove(segment.getBaseOffset())); + + try { + LocalLog.deleteSegmentFiles( + toDelete, LocalLog.SegmentDeletionReason.LOG_TRUNCATION); + } catch (IOException e) { + LOG.error( + "Failed to delete truncated segments {} for bucket {}", + toDelete, + logSegments.getTableBucket(), + e); + } + + try { + LogTablet.deleteWriterSnapshots(toDelete, writerStateManager); + } catch (IOException e) { + LOG.error( + "Failed to delete truncated writer snapshots {} for bucket {}", + toDelete, + logSegments.getTableBucket(), + e); + } + } + } + /** Loads segments from disk into the provided segments. */ - private void loadSegmentFiles() throws IOException { + private void loadSegmentFiles() throws Exception { Review Comment: We should keep throws `IOException` instead of generic Exception because this is an IO operation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
