wuchong commented on code in PR #1749:
URL: https://github.com/apache/fluss/pull/1749#discussion_r2490431910


##########
fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java:
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.log;
+
+import org.apache.fluss.compression.ArrowCompressionInfo;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.MemorySize;
+import org.apache.fluss.metadata.LogFormat;
+import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.record.LogTestBase;
+import org.apache.fluss.record.MemoryLogRecords;
+import org.apache.fluss.server.exception.CorruptIndexException;
+import org.apache.fluss.server.metrics.group.TestingMetricGroups;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.clock.ManualClock;
+import org.apache.fluss.utils.clock.SystemClock;
+import org.apache.fluss.utils.concurrent.FlussScheduler;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
+import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
+import static 
org.apache.fluss.testutils.DataTestUtils.createBasicMemoryLogRecords;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link LogLoader}. */
+final class LogLoaderTest extends LogTestBase {
+
+    private @TempDir File tempDir;
+    private FlussScheduler scheduler;
+    private File logDir;
+    private Clock clock;
+
+    @BeforeEach
+    public void setup() throws Exception {
+        conf.set(ConfigOptions.LOG_SEGMENT_FILE_SIZE, 
MemorySize.parse("10kb"));
+        conf.set(ConfigOptions.LOG_INDEX_INTERVAL_SIZE, 
MemorySize.parse("1b"));
+
+        logDir =
+                LogTestUtils.makeRandomLogTabletDir(
+                        tempDir,
+                        DATA1_TABLE_PATH.getDatabaseName(),
+                        DATA1_TABLE_ID,
+                        DATA1_TABLE_PATH.getTableName());
+
+        scheduler = new FlussScheduler(1);
+        scheduler.startup();
+
+        clock = new ManualClock();
+    }
+
+    // TODO: add more tests like Kafka LogLoaderTest
+
+    @Test
+    void testCorruptIndexRebuild() throws Exception {
+        // publish the records and close the log
+        int numRecords = 200;
+        LogTablet logTablet = createLogTablet(true);
+        appendRecords(logTablet, numRecords);
+        // collect all the index files
+        List<File> indexFiles = collectIndexFiles(logTablet.logSegments());
+        logTablet.close();
+
+        // corrupt all the index files
+        for (File indexFile : indexFiles) {
+            try (FileChannel fileChannel =
+                    FileChannel.open(indexFile.toPath(), 
StandardOpenOption.APPEND)) {
+                for (int i = 0; i < 12; i++) {
+                    fileChannel.write(ByteBuffer.wrap(new byte[] {0}));
+                }
+            }
+        }
+
+        // test reopen the log without recovery, sanity check of index files 
should throw exception
+        logTablet = createLogTablet(true);
+        for (LogSegment segment : logTablet.logSegments()) {
+            if (segment.getBaseOffset() != 
logTablet.activeLogSegment().getBaseOffset()) {
+                assertThatThrownBy(segment.offsetIndex()::sanityCheck)
+                        .isInstanceOf(CorruptIndexException.class)
+                        .hasMessage(
+                                String.format(
+                                        "Index file %s is corrupt, found %d 
bytes which is neither positive nor a multiple of %d",
+                                        
segment.offsetIndex().file().getAbsolutePath(),
+                                        segment.offsetIndex().length(),
+                                        segment.offsetIndex().entrySize()));
+                assertThatThrownBy(segment.timeIndex()::sanityCheck)
+                        .isInstanceOf(CorruptIndexException.class)
+                        .hasMessageContaining(
+                                String.format(
+                                        "Corrupt time index found, time index 
file (%s) has non-zero size but the last timestamp is 0 which is less than the 
first timestamp",
+                                        
segment.timeIndex().file().getAbsolutePath()));
+            } else {
+                // the offset index file of active segment will be resized, 
which case no corruption
+                // exception when doing sanity check
+                segment.offsetIndex().sanityCheck();
+                assertThatThrownBy(segment.timeIndex()::sanityCheck)
+                        .isInstanceOf(CorruptIndexException.class)
+                        .hasMessageContaining(
+                                String.format(
+                                        "Corrupt time index found, time index 
file (%s) has non-zero size but the last timestamp is 0 which is less than the 
first timestamp",
+                                        
segment.timeIndex().file().getAbsolutePath()));
+            }
+        }
+        logTablet.close();
+
+        // test reopen the log with recovery, sanity check of index files 
should no corruption
+        logTablet = createLogTablet(false);
+        for (LogSegment segment : logTablet.logSegments()) {
+            segment.offsetIndex().sanityCheck();
+            segment.timeIndex().sanityCheck();
+        }
+        assertThat(numRecords).isEqualTo(logTablet.localLogEndOffset());
+        for (int i = 0; i < numRecords; i++) {
+            assertThat(i)
+                    
.isEqualTo(logTablet.lookupOffsetForTimestamp(clock.milliseconds() + i * 10));
+        }
+        logTablet.close();
+    }
+
+    @Test
+    void testCorruptIndexRebuildWithRecoveryPoint() throws Exception {
+        // publish the records and close the log
+        int numRecords = 200;
+        LogTablet logTablet = createLogTablet(true);
+        appendRecords(logTablet, numRecords);
+        // collect all the index files
+        long recoveryPoint = logTablet.localLogEndOffset() / 2;
+        List<File> indexFiles = collectIndexFiles(logTablet.logSegments());
+        logTablet.close();
+
+        // corrupt all the index files
+        for (File indexFile : indexFiles) {
+            try (FileChannel fileChannel =
+                    FileChannel.open(indexFile.toPath(), 
StandardOpenOption.APPEND)) {
+                for (int i = 0; i < 12; i++) {
+                    fileChannel.write(ByteBuffer.wrap(new byte[] {0}));
+                }
+            }
+        }
+
+        // test reopen the log with recovery point
+        logTablet = createLogTablet(false, recoveryPoint);
+        List<LogSegment> logSegments = logTablet.logSegments(recoveryPoint, 
Long.MAX_VALUE);
+        assertThat(logSegments.size() < 
logTablet.logSegments().size()).isTrue();
+        Set<Long> recoveredSegments =
+                
logSegments.stream().map(LogSegment::getBaseOffset).collect(Collectors.toSet());
+        for (LogSegment segment : logTablet.logSegments()) {
+            if (recoveredSegments.contains(segment.getBaseOffset())) {
+                segment.offsetIndex().sanityCheck();
+                segment.timeIndex().sanityCheck();
+            } else {
+                // the segments before recovery point will not be recovered, 
so sanity check should
+                // still throw corrupt exception
+                assertThatThrownBy(segment.offsetIndex()::sanityCheck)
+                        .isInstanceOf(CorruptIndexException.class)
+                        .hasMessage(
+                                String.format(
+                                        "Index file %s is corrupt, found %d 
bytes which is neither positive nor a multiple of %d",
+                                        
segment.offsetIndex().file().getAbsolutePath(),
+                                        segment.offsetIndex().length(),
+                                        segment.offsetIndex().entrySize()));
+                assertThatThrownBy(segment.timeIndex()::sanityCheck)
+                        .isInstanceOf(CorruptIndexException.class)
+                        .hasMessageContaining(
+                                String.format(
+                                        "Corrupt time index found, time index 
file (%s) has non-zero size but the last timestamp is 0 which is less than the 
first timestamp",
+                                        
segment.timeIndex().file().getAbsolutePath()));
+            }
+        }
+    }
+
+    @Test
+    void testIndexRebuild() throws Exception {
+        // publish the records and close the log
+        int numRecords = 200;
+        LogTablet logTablet = createLogTablet(true);
+        appendRecords(logTablet, numRecords);
+        // collect all index files
+        List<File> indexFiles = collectIndexFiles(logTablet.logSegments());
+        logTablet.close();
+
+        // delete all the index files
+        indexFiles.forEach(File::delete);
+
+        // reopen the log
+        logTablet = createLogTablet(false);
+        assertThat(logTablet.localLogEndOffset()).isEqualTo(numRecords);
+        // the index files should be rebuilt
+        assertThat(logTablet.logSegments().get(0).offsetIndex().entries() > 
0).isTrue();
+        assertThat(logTablet.logSegments().get(0).timeIndex().entries() > 
0).isTrue();
+        for (int i = 0; i < numRecords; i++) {
+            assertThat(i)
+                    
.isEqualTo(logTablet.lookupOffsetForTimestamp(clock.milliseconds() + i * 10));

Review Comment:
   ditto



##########
fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java:
##########
@@ -155,8 +281,26 @@ private void loadSegmentFiles() throws IOException {
                         }
                     } else if (LocalLog.isLogFile(file)) {
                         long baseOffset = FlussPaths.offsetFromFile(file);
+                        boolean timeIndexFileNewlyCreated =
+                                !FlussPaths.timeIndexFile(logTabletDir, 
baseOffset).exists();
                         LogSegment segment =
                                 LogSegment.open(logTabletDir, baseOffset, 
conf, true, 0, logFormat);
+
+                        try {
+                            segment.sanityCheck(timeIndexFileNewlyCreated);
+                        } catch (Exception e) {
+                            if (e instanceof NoSuchFieldException) {
+                                if (isCleanShutdown
+                                        || segment.getBaseOffset() < 
recoveryPointCheckpoint) {
+                                    LOG.error(
+                                            "Could not find offset index file 
corresponding to log file {} "
+                                                    + "for bucket {}, 
recovering segment and rebuilding index files...",
+                                            logSegments.getTableBucket(),
+                                            
segment.getFileLogRecords().file().getAbsoluteFile());
+                                }
+                                recoverSegment(segment);
+                            }

Review Comment:
   Should we catch `CorruptIndexException` and recover segment for this case 
like how kafka does? And we should rethrow other exceptions in else branch.



##########
fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java:
##########
@@ -129,16 +165,106 @@ public LoadedLogOffsets load() throws IOException {
      *     overflow
      */
     private Tuple2<Long, Long> recoverLog() throws IOException {
-        // TODO truncate log to recover maybe unflush segments.
+        if (!isCleanShutdown) {
+            List<LogSegment> unflushed =
+                    logSegments.values(recoveryPointCheckpoint, 
Long.MAX_VALUE);
+            int numUnflushed = unflushed.size();
+            Iterator<LogSegment> unflushedIter = unflushed.iterator();
+            boolean truncated = false;
+            int numFlushed = 1;
+
+            while (unflushedIter.hasNext() && !truncated) {
+                LogSegment segment = unflushedIter.next();
+                LOG.info(
+                        "Recovering unflushed segment {}. {}/{} recovered for 
bucket {}",
+                        segment.getBaseOffset(),
+                        numFlushed,
+                        numUnflushed,
+                        logSegments.getTableBucket());
+
+                int truncatedBytes = -1;
+                try {
+                    truncatedBytes = recoverSegment(segment);
+                } catch (Exception e) {
+                    if (e instanceof InvalidOffsetException) {
+                        long startOffset = segment.getBaseOffset();
+                        LOG.warn(
+                                "Found invalid offset during recovery for 
bucket {}. Deleting the corrupt segment "
+                                        + "and creating an empty one with 
starting offset {}",
+                                logSegments.getTableBucket(),
+                                startOffset);
+                        truncatedBytes = segment.truncateTo(startOffset);
+                    }

Review Comment:
   We should rethrow the exception in the else branch. Otherwise, the exception 
is swallowed. 



##########
fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java:
##########
@@ -117,6 +122,37 @@ public LoadedLogOffsets load() throws IOException {
                         nextOffset, activeSegment.getBaseOffset(), 
activeSegment.getSizeInBytes()));
     }
 
+    /**
+     * Just recovers the given segment, without adding it to the provided 
segments.
+     *
+     * @param segment Segment to recover
+     * @return The number of bytes truncated from the segment
+     * @throws LogSegmentOffsetOverflowException if the segment contains 
messages that cause index
+     *     offset overflow
+     */
+    private int recoverSegment(LogSegment segment) throws Exception {
+        WriterStateManager writerStateManager =
+                new WriterStateManager(
+                        logSegments.getTableBucket(),
+                        logTabletDir,
+                        this.writerStateManager.writerExpirationMs());
+        // TODO, Here, we use 0 as the logStartOffset passed into 
rebuildWriterState. The reason is
+        // that the current implementation of logStartOffset in Fluss is not 
yet fully refined, and
+        // there may be cases where logStartOffset is not updated. As a 
result, logStartOffset is
+        // not yet reliable. Once the issue with correctly updating 
logStartOffset is resolved in
+        // issue https://github.com/apache/fluss/issues/744, we can use 
logStartOffset here.
+        // Additionally, using 0 versus using logStartOffset does not affect 
correctness—they both
+        // can restore the complete WriterState. The only difference is that 
using logStartOffset
+        // can potentially skip over more segments.
+        LogTablet.rebuildWriterState(
+                writerStateManager, logSegments, 0, segment.getBaseOffset(), 
false);
+        int bytesTruncated = segment.recover();

Review Comment:
   Do we need to pass `writerStateManager` into `segment.recover()` to update 
writer state like how kafka does? cc @swuferhong  



##########
fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java:
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.log;
+
+import org.apache.fluss.compression.ArrowCompressionInfo;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.MemorySize;
+import org.apache.fluss.metadata.LogFormat;
+import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.record.LogTestBase;
+import org.apache.fluss.record.MemoryLogRecords;
+import org.apache.fluss.server.exception.CorruptIndexException;
+import org.apache.fluss.server.metrics.group.TestingMetricGroups;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.clock.ManualClock;
+import org.apache.fluss.utils.clock.SystemClock;
+import org.apache.fluss.utils.concurrent.FlussScheduler;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
+import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
+import static 
org.apache.fluss.testutils.DataTestUtils.createBasicMemoryLogRecords;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link LogLoader}. */
+final class LogLoaderTest extends LogTestBase {
+
+    private @TempDir File tempDir;
+    private FlussScheduler scheduler;
+    private File logDir;
+    private Clock clock;
+
+    @BeforeEach
+    public void setup() throws Exception {
+        conf.set(ConfigOptions.LOG_SEGMENT_FILE_SIZE, 
MemorySize.parse("10kb"));
+        conf.set(ConfigOptions.LOG_INDEX_INTERVAL_SIZE, 
MemorySize.parse("1b"));
+
+        logDir =
+                LogTestUtils.makeRandomLogTabletDir(
+                        tempDir,
+                        DATA1_TABLE_PATH.getDatabaseName(),
+                        DATA1_TABLE_ID,
+                        DATA1_TABLE_PATH.getTableName());
+
+        scheduler = new FlussScheduler(1);
+        scheduler.startup();
+
+        clock = new ManualClock();
+    }
+
+    // TODO: add more tests like Kafka LogLoaderTest
+
+    @Test
+    void testCorruptIndexRebuild() throws Exception {
+        // publish the records and close the log
+        int numRecords = 200;
+        LogTablet logTablet = createLogTablet(true);
+        appendRecords(logTablet, numRecords);
+        // collect all the index files
+        List<File> indexFiles = collectIndexFiles(logTablet.logSegments());
+        logTablet.close();
+
+        // corrupt all the index files
+        for (File indexFile : indexFiles) {
+            try (FileChannel fileChannel =
+                    FileChannel.open(indexFile.toPath(), 
StandardOpenOption.APPEND)) {
+                for (int i = 0; i < 12; i++) {
+                    fileChannel.write(ByteBuffer.wrap(new byte[] {0}));
+                }
+            }
+        }
+
+        // test reopen the log without recovery, sanity check of index files 
should throw exception
+        logTablet = createLogTablet(true);
+        for (LogSegment segment : logTablet.logSegments()) {
+            if (segment.getBaseOffset() != 
logTablet.activeLogSegment().getBaseOffset()) {
+                assertThatThrownBy(segment.offsetIndex()::sanityCheck)
+                        .isInstanceOf(CorruptIndexException.class)
+                        .hasMessage(
+                                String.format(
+                                        "Index file %s is corrupt, found %d 
bytes which is neither positive nor a multiple of %d",
+                                        
segment.offsetIndex().file().getAbsolutePath(),
+                                        segment.offsetIndex().length(),
+                                        segment.offsetIndex().entrySize()));
+                assertThatThrownBy(segment.timeIndex()::sanityCheck)
+                        .isInstanceOf(CorruptIndexException.class)
+                        .hasMessageContaining(
+                                String.format(
+                                        "Corrupt time index found, time index 
file (%s) has non-zero size but the last timestamp is 0 which is less than the 
first timestamp",
+                                        
segment.timeIndex().file().getAbsolutePath()));
+            } else {
+                // the offset index file of active segment will be resized, 
which case no corruption
+                // exception when doing sanity check
+                segment.offsetIndex().sanityCheck();
+                assertThatThrownBy(segment.timeIndex()::sanityCheck)
+                        .isInstanceOf(CorruptIndexException.class)
+                        .hasMessageContaining(
+                                String.format(
+                                        "Corrupt time index found, time index 
file (%s) has non-zero size but the last timestamp is 0 which is less than the 
first timestamp",
+                                        
segment.timeIndex().file().getAbsolutePath()));
+            }
+        }
+        logTablet.close();
+
+        // test reopen the log with recovery, sanity check of index files 
should no corruption
+        logTablet = createLogTablet(false);
+        for (LogSegment segment : logTablet.logSegments()) {
+            segment.offsetIndex().sanityCheck();
+            segment.timeIndex().sanityCheck();
+        }
+        assertThat(numRecords).isEqualTo(logTablet.localLogEndOffset());
+        for (int i = 0; i < numRecords; i++) {
+            assertThat(i)
+                    
.isEqualTo(logTablet.lookupOffsetForTimestamp(clock.milliseconds() + i * 10));

Review Comment:
   ```
               
assertThat(logTablet.lookupOffsetForTimestamp(clock.milliseconds() + i * 10))
                       .isEqualTo(i);
   ```
   
   The actual value should in `assertThat(..)` 



##########
fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java:
##########
@@ -172,6 +172,23 @@ public void resizeIndexes(int size) throws IOException {
         timeIndex().resize(size);
     }
 
+    public void sanityCheck(boolean timeIndexFileNewlyCreated) throws 
Exception {
+        if (lazyOffsetIndex.file().exists()) {
+            // Resize the time index file to 0 if it is newly created.
+            if (timeIndexFileNewlyCreated) {
+                timeIndex().resize(0);
+            }
+            // Sanity checks for time index and offset index are skipped 
because
+            // we will recover the segments above the recovery point in 
recoverLog()
+            // in any case so sanity checking them here is redundant.
+        } else {
+            throw new NoSuchFieldException(

Review Comment:
   should throw `NoSuchFileException` 



##########
fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java:
##########
@@ -129,16 +165,106 @@ public LoadedLogOffsets load() throws IOException {
      *     overflow
      */
     private Tuple2<Long, Long> recoverLog() throws IOException {
-        // TODO truncate log to recover maybe unflush segments.
+        if (!isCleanShutdown) {
+            List<LogSegment> unflushed =
+                    logSegments.values(recoveryPointCheckpoint, 
Long.MAX_VALUE);
+            int numUnflushed = unflushed.size();
+            Iterator<LogSegment> unflushedIter = unflushed.iterator();
+            boolean truncated = false;
+            int numFlushed = 1;
+
+            while (unflushedIter.hasNext() && !truncated) {
+                LogSegment segment = unflushedIter.next();
+                LOG.info(
+                        "Recovering unflushed segment {}. {}/{} recovered for 
bucket {}",
+                        segment.getBaseOffset(),
+                        numFlushed,
+                        numUnflushed,
+                        logSegments.getTableBucket());
+
+                int truncatedBytes = -1;
+                try {
+                    truncatedBytes = recoverSegment(segment);
+                } catch (Exception e) {
+                    if (e instanceof InvalidOffsetException) {
+                        long startOffset = segment.getBaseOffset();
+                        LOG.warn(
+                                "Found invalid offset during recovery for 
bucket {}. Deleting the corrupt segment "
+                                        + "and creating an empty one with 
starting offset {}",
+                                logSegments.getTableBucket(),
+                                startOffset);
+                        truncatedBytes = segment.truncateTo(startOffset);
+                    }
+                }
+
+                if (truncatedBytes > 0) {
+                    // we had an invalid message, delete all remaining log
+                    LOG.warn(
+                            "Corruption found in segment {} for bucket {}, 
truncating to offset {}",
+                            segment.getBaseOffset(),
+                            logSegments.getTableBucket(),
+                            segment.readNextOffset());
+                    removeAndDeleteSegments(unflushedIter);
+                    truncated = true;
+                } else {
+                    numFlushed += 1;
+                }
+            }
+        }
+
         if (logSegments.isEmpty()) {
+            // TODO: use logStartOffset if issue 
https://github.com/apache/fluss/issues/744 ready
             logSegments.add(LogSegment.open(logTabletDir, 0L, conf, 
logFormat));
         }
         long logEndOffset = logSegments.lastSegment().get().readNextOffset();
         return Tuple2.of(recoveryPointCheckpoint, logEndOffset);
     }
 
+    /**
+     * This method deletes the given log segments and the associated writer 
snapshots.
+     *
+     * <p>This method does not need to convert IOException to {@link 
LogStorageException} because it
+     * is either called before all logs are loaded or the immediate caller 
will catch and handle
+     * IOException
+     *
+     * @param segmentsToDelete The log segments to schedule for deletion
+     */
+    private void removeAndDeleteSegments(Iterator<LogSegment> 
segmentsToDelete) {
+        if (segmentsToDelete.hasNext()) {
+            List<LogSegment> toDelete = new ArrayList<>();
+            segmentsToDelete.forEachRemaining(toDelete::add);
+
+            LOG.info(
+                    "Deleting segments for bucket {} as part of log recovery: 
{}",
+                    logSegments.getTableBucket(),
+                    
toDelete.stream().map(LogSegment::toString).collect(Collectors.joining(",")));
+            toDelete.forEach(segment -> 
logSegments.remove(segment.getBaseOffset()));
+
+            try {
+                LocalLog.deleteSegmentFiles(
+                        toDelete, 
LocalLog.SegmentDeletionReason.LOG_TRUNCATION);
+            } catch (IOException e) {
+                LOG.error(
+                        "Failed to delete truncated segments {} for bucket {}",
+                        toDelete,
+                        logSegments.getTableBucket(),
+                        e);
+            }
+
+            try {
+                LogTablet.deleteWriterSnapshots(toDelete, writerStateManager);
+            } catch (IOException e) {
+                LOG.error(
+                        "Failed to delete truncated writer snapshots {} for 
bucket {}",
+                        toDelete,
+                        logSegments.getTableBucket(),
+                        e);
+            }
+        }
+    }
+
     /** Loads segments from disk into the provided segments. */
-    private void loadSegmentFiles() throws IOException {
+    private void loadSegmentFiles() throws Exception {

Review Comment:
   We should keep throws `IOException` instead of generic Exception because 
this is an IO operation. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to