This is an automated email from the ASF dual-hosted git repository.
yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 53b108bcd Revert "[server] Recover log and index file for unclean
shutdown (#1749)" (#2036)
53b108bcd is described below
commit 53b108bcdc987ef0744541a73f5a909b8910b4be
Author: yunhong <[email protected]>
AuthorDate: Thu Nov 27 13:38:32 2025 +0800
Revert "[server] Recover log and index file for unclean shutdown (#1749)"
(#2036)
This reverts commit d5cb521471eeac7a07120ecde1a199c61c62dd02.
---
.../org/apache/fluss/server/log/LogLoader.java | 151 +---------
.../org/apache/fluss/server/log/LogSegment.java | 20 +-
.../org/apache/fluss/server/log/LogTablet.java | 2 +-
.../fluss/server/log/WriterStateManager.java | 4 -
.../org/apache/fluss/server/log/LogLoaderTest.java | 325 ---------------------
5 files changed, 3 insertions(+), 499 deletions(-)
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java
index 95f41c021..62d38581c 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java
@@ -19,7 +19,6 @@ package org.apache.fluss.server.log;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
-import org.apache.fluss.exception.InvalidOffsetException;
import org.apache.fluss.exception.LogSegmentOffsetOverflowException;
import org.apache.fluss.exception.LogStorageException;
import org.apache.fluss.metadata.LogFormat;
@@ -32,13 +31,8 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.stream.Collectors;
/* This file is based on source code of Apache Kafka Project
(https://kafka.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
@@ -123,37 +117,6 @@ final class LogLoader {
nextOffset, activeSegment.getBaseOffset(),
activeSegment.getSizeInBytes()));
}
- /**
- * Just recovers the given segment, without adding it to the provided
segments.
- *
- * @param segment Segment to recover
- * @return The number of bytes truncated from the segment
- * @throws LogSegmentOffsetOverflowException if the segment contains
messages that cause index
- * offset overflow
- */
- private int recoverSegment(LogSegment segment) throws IOException {
- WriterStateManager writerStateManager =
- new WriterStateManager(
- logSegments.getTableBucket(),
- logTabletDir,
- this.writerStateManager.writerExpirationMs());
- // TODO, Here, we use 0 as the logStartOffset passed into
rebuildWriterState. The reason is
- // that the current implementation of logStartOffset in Fluss is not
yet fully refined, and
- // there may be cases where logStartOffset is not updated. As a
result, logStartOffset is
- // not yet reliable. Once the issue with correctly updating
logStartOffset is resolved in
- // issue https://github.com/apache/fluss/issues/744, we can use
logStartOffset here.
- // Additionally, using 0 versus using logStartOffset does not affect
correctness—they both
- // can restore the complete WriterState. The only difference is that
using logStartOffset
- // can potentially skip over more segments.
- LogTablet.rebuildWriterState(
- writerStateManager, logSegments, 0, segment.getBaseOffset(),
false);
- int bytesTruncated = segment.recover();
- // once we have recovered the segment's data, take a snapshot to
ensure that we won't
- // need to reload the same segment again while recovering another
segment.
- writerStateManager.takeSnapshot();
- return bytesTruncated;
- }
-
/**
* Recover the log segments (if there was an unclean shutdown). Ensures
there is at least one
* active segment, and returns the updated recovery point and next offset
after recovery.
@@ -166,106 +129,14 @@ final class LogLoader {
* overflow
*/
private Tuple2<Long, Long> recoverLog() throws IOException {
- if (!isCleanShutdown) {
- List<LogSegment> unflushed =
- logSegments.values(recoveryPointCheckpoint,
Long.MAX_VALUE);
- int numUnflushed = unflushed.size();
- Iterator<LogSegment> unflushedIter = unflushed.iterator();
- boolean truncated = false;
- int numFlushed = 1;
-
- while (unflushedIter.hasNext() && !truncated) {
- LogSegment segment = unflushedIter.next();
- LOG.info(
- "Recovering unflushed segment {}. {}/{} recovered for
bucket {}",
- segment.getBaseOffset(),
- numFlushed,
- numUnflushed,
- logSegments.getTableBucket());
-
- int truncatedBytes = -1;
- try {
- truncatedBytes = recoverSegment(segment);
- } catch (Exception e) {
- if (e instanceof InvalidOffsetException) {
- long startOffset = segment.getBaseOffset();
- LOG.warn(
- "Found invalid offset during recovery for
bucket {}. Deleting the corrupt segment "
- + "and creating an empty one with
starting offset {}",
- logSegments.getTableBucket(),
- startOffset);
- truncatedBytes = segment.truncateTo(startOffset);
- } else {
- throw e;
- }
- }
-
- if (truncatedBytes > 0) {
- // we had an invalid message, delete all remaining log
- LOG.warn(
- "Corruption found in segment {} for bucket {},
truncating to offset {}",
- segment.getBaseOffset(),
- logSegments.getTableBucket(),
- segment.readNextOffset());
- removeAndDeleteSegments(unflushedIter);
- truncated = true;
- } else {
- numFlushed += 1;
- }
- }
- }
-
+ // TODO truncate log to recover maybe unflush segments.
if (logSegments.isEmpty()) {
- // TODO: use logStartOffset if issue
https://github.com/apache/fluss/issues/744 ready
logSegments.add(LogSegment.open(logTabletDir, 0L, conf,
logFormat));
}
long logEndOffset = logSegments.lastSegment().get().readNextOffset();
return Tuple2.of(recoveryPointCheckpoint, logEndOffset);
}
- /**
- * This method deletes the given log segments and the associated writer
snapshots.
- *
- * <p>This method does not need to convert IOException to {@link
LogStorageException} because it
- * is either called before all logs are loaded or the immediate caller
will catch and handle
- * IOException
- *
- * @param segmentsToDelete The log segments to schedule for deletion
- */
- private void removeAndDeleteSegments(Iterator<LogSegment>
segmentsToDelete) {
- if (segmentsToDelete.hasNext()) {
- List<LogSegment> toDelete = new ArrayList<>();
- segmentsToDelete.forEachRemaining(toDelete::add);
-
- LOG.info(
- "Deleting segments for bucket {} as part of log recovery:
{}",
- logSegments.getTableBucket(),
-
toDelete.stream().map(LogSegment::toString).collect(Collectors.joining(",")));
- toDelete.forEach(segment ->
logSegments.remove(segment.getBaseOffset()));
-
- try {
- LocalLog.deleteSegmentFiles(
- toDelete,
LocalLog.SegmentDeletionReason.LOG_TRUNCATION);
- } catch (IOException e) {
- LOG.error(
- "Failed to delete truncated segments {} for bucket {}",
- toDelete,
- logSegments.getTableBucket(),
- e);
- }
-
- try {
- LogTablet.deleteWriterSnapshots(toDelete, writerStateManager);
- } catch (IOException e) {
- LOG.error(
- "Failed to delete truncated writer snapshots {} for
bucket {}",
- toDelete,
- logSegments.getTableBucket(),
- e);
- }
- }
- }
-
/** Loads segments from disk into the provided segments. */
private void loadSegmentFiles() throws IOException {
File[] sortedFiles = logTabletDir.listFiles();
@@ -285,28 +156,8 @@ final class LogLoader {
}
} else if (LocalLog.isLogFile(file)) {
long baseOffset = FlussPaths.offsetFromFile(file);
- boolean timeIndexFileNewlyCreated =
- !FlussPaths.timeIndexFile(logTabletDir,
baseOffset).exists();
LogSegment segment =
LogSegment.open(logTabletDir, baseOffset,
conf, true, 0, logFormat);
-
- try {
- segment.sanityCheck(timeIndexFileNewlyCreated);
- } catch (IOException e) {
- if (e instanceof NoSuchFileException) {
- if (isCleanShutdown
- || segment.getBaseOffset() <
recoveryPointCheckpoint) {
- LOG.error(
- "Could not find offset index file
corresponding to log file {} "
- + "for bucket {},
recovering segment and rebuilding index files...",
- logSegments.getTableBucket(),
-
segment.getFileLogRecords().file().getAbsoluteFile());
- }
- recoverSegment(segment);
- } else {
- throw e;
- }
- }
logSegments.add(segment);
}
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java
index 93bdbdb65..0a30d5f5f 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java
@@ -44,7 +44,6 @@ import javax.annotation.concurrent.NotThreadSafe;
import java.io.File;
import java.io.IOException;
-import java.nio.file.NoSuchFileException;
import java.util.Optional;
import static
org.apache.fluss.record.LogRecordBatchFormat.V0_RECORD_BATCH_HEADER_SIZE;
@@ -173,23 +172,6 @@ public final class LogSegment {
timeIndex().resize(size);
}
- public void sanityCheck(boolean timeIndexFileNewlyCreated) throws
IOException {
- if (lazyOffsetIndex.file().exists()) {
- // Resize the time index file to 0 if it is newly created.
- if (timeIndexFileNewlyCreated) {
- timeIndex().resize(0);
- }
- // Sanity checks for time index and offset index are skipped
because
- // we will recover the segments above the recovery point in
recoverLog()
- // in any case so sanity checking them here is redundant.
- } else {
- throw new NoSuchFileException(
- "Offset index file "
- + lazyOffsetIndex.file().getAbsolutePath()
- + " does not exist.");
- }
- }
-
/**
* The maximum timestamp we see so far.
*
@@ -302,7 +284,7 @@ public final class LogSegment {
* Run recovery on the given segment. This will rebuild the index from the
log file and lop off
* any invalid bytes from the end of the log and index.
*/
- public int recover() throws IOException {
+ public int recover() throws Exception {
offsetIndex().reset();
timeIndex().reset();
int validBytes = 0;
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
index 8a5c54cbd..bf75410e4 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
@@ -1283,7 +1283,7 @@ public final class LogTablet {
loadedWriters.values().forEach(writerStateManager::update);
}
- public static void deleteWriterSnapshots(
+ private static void deleteWriterSnapshots(
List<LogSegment> segments, WriterStateManager writerStateManager)
throws IOException {
for (LogSegment segment : segments) {
writerStateManager.removeAndDeleteSnapshot(segment.getBaseOffset());
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java
index 30afb849a..4fa0baced 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java
@@ -99,10 +99,6 @@ public class WriterStateManager {
this.snapshots = loadSnapshots();
}
- public int writerExpirationMs() {
- return writerExpirationMs;
- }
-
public int writerIdCount() {
return writerIdCount;
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java
deleted file mode 100644
index 7cec74533..000000000
--- a/fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.server.log;
-
-import org.apache.fluss.compression.ArrowCompressionInfo;
-import org.apache.fluss.config.ConfigOptions;
-import org.apache.fluss.config.MemorySize;
-import org.apache.fluss.metadata.LogFormat;
-import org.apache.fluss.metadata.PhysicalTablePath;
-import org.apache.fluss.record.ChangeType;
-import org.apache.fluss.record.LogTestBase;
-import org.apache.fluss.record.MemoryLogRecords;
-import org.apache.fluss.server.exception.CorruptIndexException;
-import org.apache.fluss.server.metrics.group.TestingMetricGroups;
-import org.apache.fluss.utils.clock.Clock;
-import org.apache.fluss.utils.clock.ManualClock;
-import org.apache.fluss.utils.clock.SystemClock;
-import org.apache.fluss.utils.concurrent.FlussScheduler;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
-import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
-import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
-import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
-import static
org.apache.fluss.testutils.DataTestUtils.createBasicMemoryLogRecords;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-/** Test for {@link LogLoader}. */
-final class LogLoaderTest extends LogTestBase {
-
- private @TempDir File tempDir;
- private FlussScheduler scheduler;
- private File logDir;
- private Clock clock;
-
- @BeforeEach
- public void setup() throws Exception {
- conf.set(ConfigOptions.LOG_SEGMENT_FILE_SIZE,
MemorySize.parse("10kb"));
- conf.set(ConfigOptions.LOG_INDEX_INTERVAL_SIZE,
MemorySize.parse("1b"));
-
- logDir =
- LogTestUtils.makeRandomLogTabletDir(
- tempDir,
- DATA1_TABLE_PATH.getDatabaseName(),
- DATA1_TABLE_ID,
- DATA1_TABLE_PATH.getTableName());
-
- scheduler = new FlussScheduler(1);
- scheduler.startup();
-
- clock = new ManualClock();
- }
-
- // TODO: add more tests like Kafka LogLoaderTest
-
- @Test
- void testCorruptIndexRebuild() throws Exception {
- // publish the records and close the log
- int numRecords = 200;
- LogTablet logTablet = createLogTablet(true);
- appendRecords(logTablet, numRecords);
- // collect all the index files
- List<File> indexFiles = collectIndexFiles(logTablet.logSegments());
- logTablet.close();
-
- // corrupt all the index files
- for (File indexFile : indexFiles) {
- try (FileChannel fileChannel =
- FileChannel.open(indexFile.toPath(),
StandardOpenOption.APPEND)) {
- for (int i = 0; i < 12; i++) {
- fileChannel.write(ByteBuffer.wrap(new byte[] {0}));
- }
- }
- }
-
- // test reopen the log without recovery, sanity check of index files
should throw exception
- logTablet = createLogTablet(true);
- for (LogSegment segment : logTablet.logSegments()) {
- if (segment.getBaseOffset() !=
logTablet.activeLogSegment().getBaseOffset()) {
- assertThatThrownBy(segment.offsetIndex()::sanityCheck)
- .isInstanceOf(CorruptIndexException.class)
- .hasMessage(
- String.format(
- "Index file %s is corrupt, found %d
bytes which is neither positive nor a multiple of %d",
-
segment.offsetIndex().file().getAbsolutePath(),
- segment.offsetIndex().length(),
- segment.offsetIndex().entrySize()));
- assertThatThrownBy(segment.timeIndex()::sanityCheck)
- .isInstanceOf(CorruptIndexException.class)
- .hasMessageContaining(
- String.format(
- "Corrupt time index found, time index
file (%s) has non-zero size but the last timestamp is 0 which is less than the
first timestamp",
-
segment.timeIndex().file().getAbsolutePath()));
- } else {
- // the offset index file of active segment will be resized,
which case no corruption
- // exception when doing sanity check
- segment.offsetIndex().sanityCheck();
- assertThatThrownBy(segment.timeIndex()::sanityCheck)
- .isInstanceOf(CorruptIndexException.class)
- .hasMessageContaining(
- String.format(
- "Corrupt time index found, time index
file (%s) has non-zero size but the last timestamp is 0 which is less than the
first timestamp",
-
segment.timeIndex().file().getAbsolutePath()));
- }
- }
- logTablet.close();
-
- // test reopen the log with recovery, sanity check of index files
should no corruption
- logTablet = createLogTablet(false);
- for (LogSegment segment : logTablet.logSegments()) {
- segment.offsetIndex().sanityCheck();
- segment.timeIndex().sanityCheck();
- }
- assertThat(numRecords).isEqualTo(logTablet.localLogEndOffset());
- for (int i = 0; i < numRecords; i++) {
- assertThat(logTablet.lookupOffsetForTimestamp(clock.milliseconds()
+ i * 10))
- .isEqualTo(i);
- }
- logTablet.close();
- }
-
- @Test
- void testCorruptIndexRebuildWithRecoveryPoint() throws Exception {
- // publish the records and close the log
- int numRecords = 200;
- LogTablet logTablet = createLogTablet(true);
- appendRecords(logTablet, numRecords);
- // collect all the index files
- long recoveryPoint = logTablet.localLogEndOffset() / 2;
- List<File> indexFiles = collectIndexFiles(logTablet.logSegments());
- logTablet.close();
-
- // corrupt all the index files
- for (File indexFile : indexFiles) {
- try (FileChannel fileChannel =
- FileChannel.open(indexFile.toPath(),
StandardOpenOption.APPEND)) {
- for (int i = 0; i < 12; i++) {
- fileChannel.write(ByteBuffer.wrap(new byte[] {0}));
- }
- }
- }
-
- // test reopen the log with recovery point
- logTablet = createLogTablet(false, recoveryPoint);
- List<LogSegment> logSegments = logTablet.logSegments(recoveryPoint,
Long.MAX_VALUE);
- assertThat(logSegments.size() <
logTablet.logSegments().size()).isTrue();
- Set<Long> recoveredSegments =
-
logSegments.stream().map(LogSegment::getBaseOffset).collect(Collectors.toSet());
- for (LogSegment segment : logTablet.logSegments()) {
- if (recoveredSegments.contains(segment.getBaseOffset())) {
- segment.offsetIndex().sanityCheck();
- segment.timeIndex().sanityCheck();
- } else {
- // the segments before recovery point will not be recovered,
so sanity check should
- // still throw corrupt exception
- assertThatThrownBy(segment.offsetIndex()::sanityCheck)
- .isInstanceOf(CorruptIndexException.class)
- .hasMessage(
- String.format(
- "Index file %s is corrupt, found %d
bytes which is neither positive nor a multiple of %d",
-
segment.offsetIndex().file().getAbsolutePath(),
- segment.offsetIndex().length(),
- segment.offsetIndex().entrySize()));
- assertThatThrownBy(segment.timeIndex()::sanityCheck)
- .isInstanceOf(CorruptIndexException.class)
- .hasMessageContaining(
- String.format(
- "Corrupt time index found, time index
file (%s) has non-zero size but the last timestamp is 0 which is less than the
first timestamp",
-
segment.timeIndex().file().getAbsolutePath()));
- }
- }
- }
-
- @Test
- void testIndexRebuild() throws Exception {
- // publish the records and close the log
- int numRecords = 200;
- LogTablet logTablet = createLogTablet(true);
- appendRecords(logTablet, numRecords);
- // collect all index files
- List<File> indexFiles = collectIndexFiles(logTablet.logSegments());
- logTablet.close();
-
- // delete all the index files
- indexFiles.forEach(File::delete);
-
- // reopen the log
- logTablet = createLogTablet(false);
- assertThat(logTablet.localLogEndOffset()).isEqualTo(numRecords);
- // the index files should be rebuilt
- assertThat(logTablet.logSegments().get(0).offsetIndex().entries() >
0).isTrue();
- assertThat(logTablet.logSegments().get(0).timeIndex().entries() >
0).isTrue();
- for (int i = 0; i < numRecords; i++) {
- assertThat(logTablet.lookupOffsetForTimestamp(clock.milliseconds()
+ i * 10))
- .isEqualTo(i);
- }
- logTablet.close();
- }
-
- @Test
- void testInvalidOffsetRebuild() throws Exception {
- // publish the records and close the log
- int numRecords = 200;
- LogTablet logTablet = createLogTablet(true);
- appendRecords(logTablet, numRecords);
-
- List<LogSegment> logSegments = logTablet.logSegments();
- int corruptSegmentIndex = logSegments.size() / 2;
- assertThat(corruptSegmentIndex < logSegments.size()).isTrue();
- LogSegment corruptSegment = logSegments.get(corruptSegmentIndex);
-
- // append an invalid offset batch
- List<Object[]> objects = Collections.singletonList(new Object[] {1,
"a"});
- List<ChangeType> changeTypes =
- objects.stream().map(row ->
ChangeType.APPEND_ONLY).collect(Collectors.toList());
- MemoryLogRecords memoryLogRecords =
- createBasicMemoryLogRecords(
- DATA1_ROW_TYPE,
- DEFAULT_SCHEMA_ID,
- corruptSegment.getBaseOffset(),
- clock.milliseconds(),
- magic,
- System.currentTimeMillis(),
- 0,
- changeTypes,
- objects,
- LogFormat.ARROW,
- ArrowCompressionInfo.DEFAULT_COMPRESSION);
- corruptSegment.getFileLogRecords().append(memoryLogRecords);
- logTablet.close();
-
- logTablet = createLogTablet(false);
- // the corrupt segment should be truncated to base offset
-
assertThat(logTablet.localLogEndOffset()).isEqualTo(corruptSegment.getBaseOffset());
- // segments after the corrupt segment should be removed
-
assertThat(logTablet.logSegments().size()).isEqualTo(corruptSegmentIndex + 1);
- }
-
- private LogTablet createLogTablet(boolean isCleanShutdown) throws
Exception {
- return createLogTablet(isCleanShutdown, 0);
- }
-
- private LogTablet createLogTablet(boolean isCleanShutdown, long
recoveryPoint)
- throws Exception {
- return LogTablet.create(
- PhysicalTablePath.of(DATA1_TABLE_PATH),
- logDir,
- conf,
- TestingMetricGroups.TABLET_SERVER_METRICS,
- recoveryPoint,
- scheduler,
- LogFormat.ARROW,
- 1,
- false,
- SystemClock.getInstance(),
- isCleanShutdown);
- }
-
- private void appendRecords(LogTablet logTablet, int numRecords) throws
Exception {
- int baseOffset = 0;
- int batchSequence = 0;
- for (int i = 0; i < numRecords; i++) {
- List<Object[]> objects = Collections.singletonList(new Object[]
{1, "a"});
- List<ChangeType> changeTypes =
- objects.stream()
- .map(row -> ChangeType.APPEND_ONLY)
- .collect(Collectors.toList());
- MemoryLogRecords memoryLogRecords =
- createBasicMemoryLogRecords(
- DATA1_ROW_TYPE,
- DEFAULT_SCHEMA_ID,
- baseOffset,
- clock.milliseconds() + i * 10L,
- magic,
- System.currentTimeMillis(),
- batchSequence,
- changeTypes,
- objects,
- LogFormat.ARROW,
- ArrowCompressionInfo.DEFAULT_COMPRESSION);
- logTablet.appendAsFollower(memoryLogRecords);
- baseOffset++;
- batchSequence++;
- }
- }
-
- private List<File> collectIndexFiles(List<LogSegment> logSegments) throws
IOException {
- List<File> indexFiles = new ArrayList<>();
- for (LogSegment segment : logSegments) {
- indexFiles.add(segment.offsetIndex().file());
- indexFiles.add(segment.timeIndex().file());
- }
- return indexFiles;
- }
-}