This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new e70d743cd [server] Recover log and index file for unclean shutdown and 
optimize log recovery time (#2044)
e70d743cd is described below

commit e70d743cd09d2e9b305106cd2932520c339a42e1
Author: Liebing <[email protected]>
AuthorDate: Sat Nov 29 23:30:43 2025 +0800

    [server] Recover log and index file for unclean shutdown and optimize log 
recovery time (#2044)
---
 .../org/apache/fluss/server/log/LogLoader.java     | 136 +++++++++
 .../org/apache/fluss/server/log/LogSegment.java    |  21 +-
 .../org/apache/fluss/server/log/LogTablet.java     |   2 +-
 .../fluss/server/log/WriterStateManager.java       |   4 +
 .../org/apache/fluss/server/log/LogLoaderTest.java | 328 +++++++++++++++++++++
 5 files changed, 489 insertions(+), 2 deletions(-)

diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java 
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java
index 62d38581c..572fbd304 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java
@@ -19,9 +19,11 @@ package org.apache.fluss.server.log;
 
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.InvalidOffsetException;
 import org.apache.fluss.exception.LogSegmentOffsetOverflowException;
 import org.apache.fluss.exception.LogStorageException;
 import org.apache.fluss.metadata.LogFormat;
+import org.apache.fluss.server.exception.CorruptIndexException;
 import org.apache.fluss.utils.FlussPaths;
 import org.apache.fluss.utils.types.Tuple2;
 
@@ -31,8 +33,13 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
 
 /* This file is based on source code of Apache Kafka Project 
(https://kafka.apache.org/), licensed by the Apache
  * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
@@ -129,6 +136,61 @@ final class LogLoader {
      *     overflow
      */
     private Tuple2<Long, Long> recoverLog() throws IOException {
+        if (!isCleanShutdown) {
+            List<LogSegment> unflushed =
+                    logSegments.values(recoveryPointCheckpoint, 
Long.MAX_VALUE);
+            int numUnflushed = unflushed.size();
+            Iterator<LogSegment> unflushedIter = unflushed.iterator();
+            boolean truncated = false;
+            int numFlushed = 1;
+
+            while (unflushedIter.hasNext() && !truncated) {
+                LogSegment segment = unflushedIter.next();
+                LOG.info(
+                        "Recovering unflushed segment {}. {}/{} recovered for 
bucket {}",
+                        segment.getBaseOffset(),
+                        numFlushed,
+                        numUnflushed,
+                        logSegments.getTableBucket());
+
+                try {
+                    segment.sanityCheck();
+                } catch (NoSuchFileException | CorruptIndexException e) {
+                    LOG.warn(
+                            "Found invalid index file corresponding log file 
{} for bucket {}, "
+                                    + "recovering segment and rebuilding index 
files...",
+                            
segment.getFileLogRecords().file().getAbsoluteFile(),
+                            logSegments.getTableBucket(),
+                            e);
+
+                    int truncatedBytes = -1;
+                    try {
+                        truncatedBytes = recoverSegment(segment);
+                    } catch (InvalidOffsetException invalidOffsetException) {
+                        long startOffset = segment.getBaseOffset();
+                        LOG.warn(
+                                "Found invalid offset during recovery for 
bucket {}. Deleting the corrupt segment "
+                                        + "and creating an empty one with 
starting offset {}",
+                                logSegments.getTableBucket(),
+                                startOffset);
+                        truncatedBytes = segment.truncateTo(startOffset);
+                    }
+
+                    if (truncatedBytes > 0) {
+                        // we had an invalid message, delete all remaining log
+                        LOG.warn(
+                                "Corruption found in segment {} for bucket {}, 
truncating to offset {}",
+                                segment.getBaseOffset(),
+                                logSegments.getTableBucket(),
+                                segment.readNextOffset());
+                        removeAndDeleteSegments(unflushedIter);
+                        truncated = true;
+                    }
+                }
+                numFlushed += 1;
+            }
+        }
+
         // TODO truncate log to recover maybe unflush segments.
         if (logSegments.isEmpty()) {
             logSegments.add(LogSegment.open(logTabletDir, 0L, conf, 
logFormat));
@@ -137,6 +199,80 @@ final class LogLoader {
         return Tuple2.of(recoveryPointCheckpoint, logEndOffset);
     }
 
+    /**
+     * This method deletes the given log segments and the associated writer 
snapshots.
+     *
+     * <p>This method does not need to convert IOException to {@link 
LogStorageException} because it
+     * is either called before all logs are loaded or the immediate caller 
will catch and handle
+     * IOException
+     *
+     * @param segmentsToDelete The log segments to schedule for deletion
+     */
+    private void removeAndDeleteSegments(Iterator<LogSegment> 
segmentsToDelete) {
+        if (segmentsToDelete.hasNext()) {
+            List<LogSegment> toDelete = new ArrayList<>();
+            segmentsToDelete.forEachRemaining(toDelete::add);
+
+            LOG.info(
+                    "Deleting segments for bucket {} as part of log recovery: 
{}",
+                    logSegments.getTableBucket(),
+                    
toDelete.stream().map(LogSegment::toString).collect(Collectors.joining(",")));
+            toDelete.forEach(segment -> 
logSegments.remove(segment.getBaseOffset()));
+
+            try {
+                LocalLog.deleteSegmentFiles(
+                        toDelete, 
LocalLog.SegmentDeletionReason.LOG_TRUNCATION);
+            } catch (IOException e) {
+                LOG.error(
+                        "Failed to delete truncated segments {} for bucket {}",
+                        toDelete,
+                        logSegments.getTableBucket(),
+                        e);
+            }
+
+            try {
+                LogTablet.deleteWriterSnapshots(toDelete, writerStateManager);
+            } catch (IOException e) {
+                LOG.error(
+                        "Failed to delete truncated writer snapshots {} for 
bucket {}",
+                        toDelete,
+                        logSegments.getTableBucket(),
+                        e);
+            }
+        }
+    }
+
+    /**
+     * Just recovers the given segment, without adding it to the provided 
segments.
+     *
+     * @param segment Segment to recover
+     * @return The number of bytes truncated from the segment
+     * @throws LogSegmentOffsetOverflowException if the segment contains 
messages that cause index
+     *     offset overflow
+     */
+    private int recoverSegment(LogSegment segment) throws IOException {
+        WriterStateManager writerStateManager =
+                new WriterStateManager(
+                        logSegments.getTableBucket(),
+                        logTabletDir,
+                        this.writerStateManager.writerExpirationMs());
+        // TODO, Here, we use 0 as the logStartOffset passed into 
rebuildWriterState. The reason is
+        // that the current implementation of logStartOffset in Fluss is not 
yet fully refined, and
+        // there may be cases where logStartOffset is not updated. As a 
result, logStartOffset is
+        // not yet reliable. Once the issue with correctly updating 
logStartOffset is resolved in
+        // issue https://github.com/apache/fluss/issues/744, we can use 
logStartOffset here.
+        // Additionally, using 0 versus using logStartOffset does not affect 
correctness—they both
+        // can restore the complete WriterState. The only difference is that 
using logStartOffset
+        // can potentially skip over more segments.
+        LogTablet.rebuildWriterState(
+                writerStateManager, logSegments, 0, segment.getBaseOffset(), 
false);
+        int bytesTruncated = segment.recover();
+        // once we have recovered the segment's data, take a snapshot to 
ensure that we won't
+        // need to reload the same segment again while recovering another 
segment.
+        writerStateManager.takeSnapshot();
+        return bytesTruncated;
+    }
+
     /** Loads segments from disk into the provided segments. */
     private void loadSegmentFiles() throws IOException {
         File[] sortedFiles = logTabletDir.listFiles();
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java 
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java
index 0a30d5f5f..c45c25e5c 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java
@@ -44,6 +44,7 @@ import javax.annotation.concurrent.NotThreadSafe;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.NoSuchFileException;
 import java.util.Optional;
 
 import static 
org.apache.fluss.record.LogRecordBatchFormat.V0_RECORD_BATCH_HEADER_SIZE;
@@ -172,6 +173,24 @@ public final class LogSegment {
         timeIndex().resize(size);
     }
 
+    public void sanityCheck() throws IOException {
+        if (!lazyOffsetIndex.file().exists()) {
+            throw new NoSuchFileException(
+                    "Offset index file "
+                            + lazyOffsetIndex.file().getAbsolutePath()
+                            + " does not exist.");
+        }
+        lazyOffsetIndex.get().sanityCheck();
+
+        if (!lazyTimeIndex.file().exists()) {
+            throw new NoSuchFileException(
+                    "Time index file "
+                            + lazyTimeIndex.file().getAbsolutePath()
+                            + " does not exist.");
+        }
+        lazyTimeIndex.get().sanityCheck();
+    }
+
     /**
      * The maximum timestamp we see so far.
      *
@@ -284,7 +303,7 @@ public final class LogSegment {
      * Run recovery on the given segment. This will rebuild the index from the 
log file and lop off
      * any invalid bytes from the end of the log and index.
      */
-    public int recover() throws Exception {
+    public int recover() throws IOException {
         offsetIndex().reset();
         timeIndex().reset();
         int validBytes = 0;
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java 
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
index bf75410e4..8a5c54cbd 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
@@ -1283,7 +1283,7 @@ public final class LogTablet {
         loadedWriters.values().forEach(writerStateManager::update);
     }
 
-    private static void deleteWriterSnapshots(
+    public static void deleteWriterSnapshots(
             List<LogSegment> segments, WriterStateManager writerStateManager) 
throws IOException {
         for (LogSegment segment : segments) {
             
writerStateManager.removeAndDeleteSnapshot(segment.getBaseOffset());
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java
index 4fa0baced..30afb849a 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java
@@ -99,6 +99,10 @@ public class WriterStateManager {
         this.snapshots = loadSnapshots();
     }
 
+    public int writerExpirationMs() {
+        return writerExpirationMs;
+    }
+
     public int writerIdCount() {
         return writerIdCount;
     }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java 
b/fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java
new file mode 100644
index 000000000..795b91cff
--- /dev/null
+++ b/fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.log;
+
+import org.apache.fluss.compression.ArrowCompressionInfo;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.MemorySize;
+import org.apache.fluss.metadata.LogFormat;
+import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.record.LogTestBase;
+import org.apache.fluss.record.MemoryLogRecords;
+import org.apache.fluss.server.exception.CorruptIndexException;
+import org.apache.fluss.server.metrics.group.TestingMetricGroups;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.clock.ManualClock;
+import org.apache.fluss.utils.clock.SystemClock;
+import org.apache.fluss.utils.concurrent.FlussScheduler;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
+import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
+import static 
org.apache.fluss.testutils.DataTestUtils.createBasicMemoryLogRecords;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link LogLoader}. */
+final class LogLoaderTest extends LogTestBase {
+
+    private @TempDir File tempDir;
+    private FlussScheduler scheduler;
+    private File logDir;
+    private Clock clock;
+
+    @BeforeEach
+    public void setup() throws Exception {
+        conf.set(ConfigOptions.LOG_SEGMENT_FILE_SIZE, 
MemorySize.parse("10kb"));
+        conf.set(ConfigOptions.LOG_INDEX_INTERVAL_SIZE, 
MemorySize.parse("1b"));
+
+        logDir =
+                LogTestUtils.makeRandomLogTabletDir(
+                        tempDir,
+                        DATA1_TABLE_PATH.getDatabaseName(),
+                        DATA1_TABLE_ID,
+                        DATA1_TABLE_PATH.getTableName());
+
+        scheduler = new FlussScheduler(1);
+        scheduler.startup();
+
+        clock = new ManualClock();
+    }
+
+    // TODO: add more tests like Kafka LogLoaderTest
+
+    @Test
+    void testCorruptIndexRebuild() throws Exception {
+        // publish the records and close the log
+        int numRecords = 200;
+        LogTablet logTablet = createLogTablet(true);
+        appendRecords(logTablet, numRecords);
+        // collect all the index files
+        List<File> indexFiles = collectIndexFiles(logTablet.logSegments());
+        logTablet.close();
+
+        // corrupt all the index files
+        for (File indexFile : indexFiles) {
+            try (FileChannel fileChannel =
+                    FileChannel.open(indexFile.toPath(), 
StandardOpenOption.APPEND)) {
+                for (int i = 0; i < 12; i++) {
+                    fileChannel.write(ByteBuffer.wrap(new byte[] {0}));
+                }
+            }
+        }
+
+        // test reopen the log without recovery, sanity check of index files 
should throw exception
+        logTablet = createLogTablet(true);
+        for (LogSegment segment : logTablet.logSegments()) {
+            if (segment.getBaseOffset() != 
logTablet.activeLogSegment().getBaseOffset()) {
+                assertThatThrownBy(segment.offsetIndex()::sanityCheck)
+                        .isInstanceOf(CorruptIndexException.class)
+                        .hasMessage(
+                                String.format(
+                                        "Index file %s is corrupt, found %d 
bytes which is neither positive nor a multiple of %d",
+                                        
segment.offsetIndex().file().getAbsolutePath(),
+                                        segment.offsetIndex().length(),
+                                        segment.offsetIndex().entrySize()));
+                assertThatThrownBy(segment.timeIndex()::sanityCheck)
+                        .isInstanceOf(CorruptIndexException.class)
+                        .hasMessageContaining(
+                                String.format(
+                                        "Corrupt time index found, time index 
file (%s) has non-zero size but the last timestamp is 0 which is less than the 
first timestamp",
+                                        
segment.timeIndex().file().getAbsolutePath()));
+            } else {
+                // the offset index file of active segment will be resized, 
which case no corruption
+                // exception when doing sanity check
+                segment.offsetIndex().sanityCheck();
+                assertThatThrownBy(segment.timeIndex()::sanityCheck)
+                        .isInstanceOf(CorruptIndexException.class)
+                        .hasMessageContaining(
+                                String.format(
+                                        "Corrupt time index found, time index 
file (%s) has non-zero size but the last timestamp is 0 which is less than the 
first timestamp",
+                                        
segment.timeIndex().file().getAbsolutePath()));
+            }
+        }
+        logTablet.close();
+
+        // test reopen the log with recovery, sanity check of index files 
should no corruption
+        logTablet = createLogTablet(false);
+        for (LogSegment segment : logTablet.logSegments()) {
+            segment.offsetIndex().sanityCheck();
+            segment.timeIndex().sanityCheck();
+        }
+        assertThat(numRecords).isEqualTo(logTablet.localLogEndOffset());
+        for (int i = 0; i < numRecords; i++) {
+            assertThat(logTablet.lookupOffsetForTimestamp(clock.milliseconds() 
+ i * 10))
+                    .isEqualTo(i);
+        }
+        logTablet.close();
+    }
+
+    @Test
+    void testCorruptIndexRebuildWithRecoveryPoint() throws Exception {
+        // publish the records and close the log
+        int numRecords = 100;
+        LogTablet logTablet = createLogTablet(true);
+        appendRecords(logTablet, numRecords);
+        // collect all the index files
+        long recoveryPoint = logTablet.localLogEndOffset() / 2;
+        List<File> indexFiles = collectIndexFiles(logTablet.logSegments());
+        logTablet.close();
+
+        // corrupt all the index files
+        for (File indexFile : indexFiles) {
+            try (FileChannel fileChannel =
+                    FileChannel.open(indexFile.toPath(), 
StandardOpenOption.APPEND)) {
+                for (int i = 0; i < 12; i++) {
+                    fileChannel.write(ByteBuffer.wrap(new byte[] {0}));
+                }
+            }
+        }
+
+        // test reopen the log with recovery point
+        logTablet = createLogTablet(false, recoveryPoint);
+        List<LogSegment> logSegments = logTablet.logSegments(recoveryPoint, 
Long.MAX_VALUE);
+        assertThat(logSegments.size() < 
logTablet.logSegments().size()).isTrue();
+        Set<Long> recoveredSegments =
+                
logSegments.stream().map(LogSegment::getBaseOffset).collect(Collectors.toSet());
+        for (LogSegment segment : logTablet.logSegments()) {
+            if (recoveredSegments.contains(segment.getBaseOffset())) {
+                segment.offsetIndex().sanityCheck();
+                segment.timeIndex().sanityCheck();
+            } else {
+                // the segments before recovery point will not be recovered, 
so sanity check should
+                // still throw corrupt exception
+                assertThatThrownBy(segment.offsetIndex()::sanityCheck)
+                        .isInstanceOf(CorruptIndexException.class)
+                        .hasMessage(
+                                String.format(
+                                        "Index file %s is corrupt, found %d 
bytes which is neither positive nor a multiple of %d",
+                                        
segment.offsetIndex().file().getAbsolutePath(),
+                                        segment.offsetIndex().length(),
+                                        segment.offsetIndex().entrySize()));
+                assertThatThrownBy(segment.timeIndex()::sanityCheck)
+                        .isInstanceOf(CorruptIndexException.class)
+                        .hasMessageContaining(
+                                String.format(
+                                        "Corrupt time index found, time index 
file (%s) has non-zero size but the last timestamp is 0 which is less than the 
first timestamp",
+                                        
segment.timeIndex().file().getAbsolutePath()));
+            }
+        }
+    }
+
+    @Test
+    void testIndexRebuild() throws Exception {
+        // publish the records and close the log
+        int numRecords = 200;
+        LogTablet logTablet = createLogTablet(true);
+        appendRecords(logTablet, numRecords);
+        // collect all index files
+        List<File> indexFiles = collectIndexFiles(logTablet.logSegments());
+        logTablet.close();
+
+        // delete all the index files
+        indexFiles.forEach(File::delete);
+
+        // reopen the log
+        logTablet = createLogTablet(false);
+        assertThat(logTablet.localLogEndOffset()).isEqualTo(numRecords);
+        // the index files should be rebuilt
+        assertThat(logTablet.logSegments().get(0).offsetIndex().entries() > 
0).isTrue();
+        assertThat(logTablet.logSegments().get(0).timeIndex().entries() > 
0).isTrue();
+        for (int i = 0; i < numRecords; i++) {
+            assertThat(logTablet.lookupOffsetForTimestamp(clock.milliseconds() 
+ i * 10))
+                    .isEqualTo(i);
+        }
+        logTablet.close();
+    }
+
+    @Test
+    void testInvalidOffsetRebuild() throws Exception {
+        // publish the records and close the log
+        int numRecords = 200;
+        LogTablet logTablet = createLogTablet(true);
+        appendRecords(logTablet, numRecords);
+
+        List<LogSegment> logSegments = logTablet.logSegments();
+        int corruptSegmentIndex = logSegments.size() / 2;
+        assertThat(corruptSegmentIndex < logSegments.size()).isTrue();
+        LogSegment corruptSegment = logSegments.get(corruptSegmentIndex);
+
+        // append an invalid offset batch
+        List<Object[]> objects = Collections.singletonList(new Object[] {1, 
"a"});
+        List<ChangeType> changeTypes =
+                objects.stream().map(row -> 
ChangeType.APPEND_ONLY).collect(Collectors.toList());
+        MemoryLogRecords memoryLogRecords =
+                createBasicMemoryLogRecords(
+                        DATA1_ROW_TYPE,
+                        DEFAULT_SCHEMA_ID,
+                        corruptSegment.getBaseOffset(),
+                        clock.milliseconds(),
+                        magic,
+                        System.currentTimeMillis(),
+                        0,
+                        changeTypes,
+                        objects,
+                        LogFormat.ARROW,
+                        ArrowCompressionInfo.DEFAULT_COMPRESSION);
+        corruptSegment.getFileLogRecords().append(memoryLogRecords);
+        logTablet.close();
+
+        // delete the index file to trigger the recovery
+        corruptSegment.offsetIndex().deleteIfExists();
+
+        logTablet = createLogTablet(false);
+        // the corrupt segment should be truncated to base offset
+        
assertThat(logTablet.localLogEndOffset()).isEqualTo(corruptSegment.getBaseOffset());
+        // segments after the corrupt segment should be removed
+        
assertThat(logTablet.logSegments().size()).isEqualTo(corruptSegmentIndex + 1);
+    }
+
+    private LogTablet createLogTablet(boolean isCleanShutdown) throws 
Exception {
+        return createLogTablet(isCleanShutdown, 0);
+    }
+
+    private LogTablet createLogTablet(boolean isCleanShutdown, long 
recoveryPoint)
+            throws Exception {
+        return LogTablet.create(
+                PhysicalTablePath.of(DATA1_TABLE_PATH),
+                logDir,
+                conf,
+                TestingMetricGroups.TABLET_SERVER_METRICS,
+                recoveryPoint,
+                scheduler,
+                LogFormat.ARROW,
+                1,
+                false,
+                SystemClock.getInstance(),
+                isCleanShutdown);
+    }
+
+    private void appendRecords(LogTablet logTablet, int numRecords) throws 
Exception {
+        int baseOffset = 0;
+        int batchSequence = 0;
+        for (int i = 0; i < numRecords; i++) {
+            List<Object[]> objects = Collections.singletonList(new Object[] 
{1, "a"});
+            List<ChangeType> changeTypes =
+                    objects.stream()
+                            .map(row -> ChangeType.APPEND_ONLY)
+                            .collect(Collectors.toList());
+            MemoryLogRecords memoryLogRecords =
+                    createBasicMemoryLogRecords(
+                            DATA1_ROW_TYPE,
+                            DEFAULT_SCHEMA_ID,
+                            baseOffset,
+                            clock.milliseconds() + i * 10L,
+                            magic,
+                            System.currentTimeMillis(),
+                            batchSequence,
+                            changeTypes,
+                            objects,
+                            LogFormat.ARROW,
+                            ArrowCompressionInfo.DEFAULT_COMPRESSION);
+            logTablet.appendAsFollower(memoryLogRecords);
+            baseOffset++;
+            batchSequence++;
+        }
+    }
+
+    private List<File> collectIndexFiles(List<LogSegment> logSegments) throws 
IOException {
+        List<File> indexFiles = new ArrayList<>();
+        for (LogSegment segment : logSegments) {
+            indexFiles.add(segment.offsetIndex().file());
+            indexFiles.add(segment.timeIndex().file());
+        }
+        return indexFiles;
+    }
+}

Reply via email to