This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 669841ea2 [server] Fix FirstRowMergeEngine hang caused by empty log
entries in projection pushdown queries (#2370)
669841ea2 is described below
commit 669841ea2bcbe29034c59e75156d362af0fd22d6
Author: Hongshun Wang <[email protected]>
AuthorDate: Sat Jan 24 11:31:44 2026 +0800
[server] Fix FirstRowMergeEngine hang caused by empty log entries in
projection pushdown queries (#2370)
---
.../client/admin/CustomFlussClusterITCase.java | 204 +++++++++++++++++++++
.../org/apache/fluss/record/FileLogProjection.java | 6 +-
.../org/apache/fluss/server/kv/KvTabletTest.java | 74 ++++----
3 files changed, 239 insertions(+), 45 deletions(-)
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/admin/CustomFlussClusterITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/admin/CustomFlussClusterITCase.java
new file mode 100644
index 000000000..28b725595
--- /dev/null
+++
b/fluss-client/src/test/java/org/apache/fluss/client/admin/CustomFlussClusterITCase.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.admin;
+
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.lookup.Lookuper;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.Scan;
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.client.table.writer.UpsertWriter;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.MemorySize;
+import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.MergeEngineType;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
+import org.apache.fluss.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
+import static
org.apache.fluss.record.LogRecordBatchFormat.V0_RECORD_BATCH_HEADER_SIZE;
+import static org.apache.fluss.record.TestData.DATA1_SCHEMA_PK;
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.apache.fluss.testutils.InternalRowAssert.assertThatRow;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT case for tests that require manual cluster management. */
+class CustomFlussClusterITCase {
+
+ @Test
+ void testProjectionPushdownWithEmptyBatches() throws Exception {
+ Configuration conf = initConfig();
+ // Configuration to reproduce the issue described in
+ // https://github.com/apache/fluss/issues/2369:
+ // 1. Disable remote log task to prevent reading from remote log
storage.
+ // 2. Set LOG_SEGMENT_FILE_SIZE to ensure that the segment before the
last segment
+ // contains an empty batch at the end.
+ // This setup causes the scanner to block indefinitely if it
incorrectly skips empty batches
+ // during projection pushdown, as it will wait forever for non-empty
data that never
+ // arrives.
+ conf.set(ConfigOptions.REMOTE_LOG_TASK_INTERVAL_DURATION,
Duration.ZERO);
+ conf.set(
+ ConfigOptions.LOG_SEGMENT_FILE_SIZE,
+ new MemorySize(2 * V0_RECORD_BATCH_HEADER_SIZE));
+ conf.set(
+ ConfigOptions.CLIENT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET,
+ new MemorySize(2 * V0_RECORD_BATCH_HEADER_SIZE));
+ final FlussClusterExtension flussClusterExtension =
+ FlussClusterExtension.builder()
+ .setNumOfTabletServers(3)
+ .setClusterConf(conf)
+ .build();
+ flussClusterExtension.start();
+
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder()
+ .schema(DATA1_SCHEMA_PK)
+ .property(ConfigOptions.TABLE_MERGE_ENGINE,
MergeEngineType.FIRST_ROW)
+ .distributedBy(1)
+ .build();
+ RowType rowType = DATA1_SCHEMA_PK.getRowType();
+ TablePath tablePath =
+ TablePath.of("test_db_1",
"test_projection_pushdown_with_empty_batches");
+
+ int rows = 5;
+ int duplicateNum = 2;
+ int batchSize = 3;
+ int count = 0;
+ // Case1: Test normal update to generator not empty cdc logs.
+ Table table = null;
+ LogScanner logScanner = null;
+ try (Connection connection =
+ ConnectionFactory.createConnection(
+ flussClusterExtension.getClientConfig());
+ Admin admin = connection.getAdmin()) {
+ admin.createDatabase(tablePath.getDatabaseName(),
DatabaseDescriptor.EMPTY, false)
+ .get();
+ admin.createTable(tablePath, tableDescriptor, false).get();
+ table = connection.getTable(tablePath);
+ // first, put rows
+ UpsertWriter upsertWriter = table.newUpsert().createWriter();
+ List<InternalRow> expectedScanRows = new ArrayList<>(rows);
+ List<InternalRow> expectedLookupRows = new ArrayList<>(rows);
+ for (int id = 0; id < rows; id++) {
+ for (int num = 0; num < duplicateNum; num++) {
+ upsertWriter.upsert(row(id, "value_" + num));
+ if (count++ > batchSize) {
+ upsertWriter.flush();
+ count = 0;
+ }
+ }
+
+ expectedLookupRows.add(row(id, "value_0"));
+ expectedScanRows.add(row(id));
+ }
+
+ upsertWriter.flush();
+
+ Lookuper lookuper = table.newLookup().createLookuper();
+ // now, get rows by lookup
+ for (int id = 0; id < rows; id++) {
+ InternalRow gotRow =
lookuper.lookup(row(id)).get().getSingletonRow();
+
assertThatRow(gotRow).withSchema(rowType).isEqualTo(expectedLookupRows.get(id));
+ }
+
+ Scan scan = table.newScan().project(new int[] {0});
+ logScanner = scan.createLogScanner();
+
+ logScanner.subscribeFromBeginning(0);
+ List<ScanRecord> actualLogRecords = new ArrayList<>(0);
+ while (actualLogRecords.size() < rows) {
+ ScanRecords scanRecords =
logScanner.poll(Duration.ofSeconds(1));
+ scanRecords.forEach(actualLogRecords::add);
+ }
+ assertThat(actualLogRecords).hasSize(rows);
+ for (int i = 0; i < actualLogRecords.size(); i++) {
+ ScanRecord scanRecord = actualLogRecords.get(i);
+
assertThat(scanRecord.getChangeType()).isEqualTo(ChangeType.INSERT);
+ assertThatRow(scanRecord.getRow())
+ .withSchema(rowType.project(new int[] {0}))
+ .isEqualTo(expectedScanRows.get(i));
+ }
+
+ // Case2: Test all the update in the write batch are
duplicate(Thus generate empty cdc
+ // logs).
+ // insert duplicate rows again to generate empty cdc log.
+ for (int num = 0; num < duplicateNum; num++) {
+ upsertWriter.upsert(row(0, "value_" + num));
+ upsertWriter.flush();
+ }
+
+ // insert a new row.
+ upsertWriter.upsert(row(rows + 1, "new_value"));
+
+ actualLogRecords = new ArrayList<>(0);
+ while (actualLogRecords.isEmpty()) {
+ ScanRecords scanRecords =
logScanner.poll(Duration.ofSeconds(1));
+ scanRecords.forEach(actualLogRecords::add);
+ }
+ logScanner.close();
+ assertThat(actualLogRecords).hasSize(1);
+ ScanRecord scanRecord = actualLogRecords.get(0);
+
assertThat(scanRecord.getChangeType()).isEqualTo(ChangeType.INSERT);
+ assertThatRow(scanRecord.getRow())
+ .withSchema(rowType.project(new int[] {0}))
+ .isEqualTo(row(rows + 1));
+ } finally {
+ if (logScanner != null) {
+ logScanner.close();
+ }
+ if (table != null) {
+ table.close();
+ }
+ flussClusterExtension.close();
+ }
+ }
+
+ protected static Configuration initConfig() {
+ Configuration conf = new Configuration();
+ conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
+ // set a shorter interval for testing purpose
+ conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1));
+ // set default datalake format for the cluster and enable datalake
tables
+ conf.set(ConfigOptions.DATALAKE_FORMAT, DataLakeFormat.PAIMON);
+
+ conf.setString("datalake.paimon.jdbc.user", "admin");
+ conf.setString("datalake.paimon.jdbc.password", "pass");
+
+ conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE,
MemorySize.parse("1mb"));
+ conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE,
MemorySize.parse("1kb"));
+ conf.set(ConfigOptions.MAX_PARTITION_NUM, 10);
+ conf.set(ConfigOptions.MAX_BUCKET_NUM, 30);
+
+ conf.set(ConfigOptions.NETTY_CLIENT_NUM_NETWORK_THREADS, 1);
+ return conf;
+ }
+}
diff --git
a/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java
b/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java
index 4717271c2..7a4cfe55d 100644
--- a/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java
+++ b/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java
@@ -140,7 +140,7 @@ public class FileLogProjection {
// So we use V0 header size here for a conservative judgment. In the
end, the condition
// of (position >= end - recordBatchHeaderSize) will ensure the final
correctness.
while (maxBytes > V0_RECORD_BATCH_HEADER_SIZE) {
- if (position >= end - V0_RECORD_BATCH_HEADER_SIZE) {
+ if (position > end - V0_RECORD_BATCH_HEADER_SIZE) {
// the remaining bytes in the file are not enough to read a
batch header up to
// magic.
return new BytesViewLogRecords(builder.build());
@@ -166,10 +166,12 @@ public class FileLogProjection {
return new BytesViewLogRecords(builder.build());
}
- // Skip empty batch. The empty batch was generated when build cdc
log batch when there
+ // Return empty batch to push forward log offset. The empty batch
was generated when
+ // build cdc log batch when there
// is no cdc log generated for this kv batch. See the comments
about the field
// 'lastOffsetDelta' in DefaultLogRecordBatch.
if (batchSizeInBytes == recordBatchHeaderSize) {
+ builder.addBytes(channel, position, batchSizeInBytes);
position += batchSizeInBytes;
continue;
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
index 34f50691a..1f0a8ec06 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
@@ -773,26 +773,20 @@ class KvTabletTest {
kvTablet.putAsLeader(kvRecordBatch2, null);
endOffset = logTablet.localLogEndOffset();
assertThat(endOffset).isEqualTo(offsetBefore + i + 1);
+ MemoryLogRecords emptyLogs =
+ logRecords(
+ readLogRowType,
+ offsetBefore + i,
+ Collections.emptyList(),
+ Collections.emptyList());
+ MultiBytesView bytesView =
+ MultiBytesView.builder()
+ .addBytes(
+ expectedLogs.getMemorySegment(), 0,
expectedLogs.sizeInBytes())
+ .addBytes(emptyLogs.getMemorySegment(), 0,
emptyLogs.sizeInBytes())
+ .build();
+ expectedLogs = MemoryLogRecords.pointToBytesView(bytesView);
- // the empty batch will be read if no projection,
- // the empty batch will not be read if has projection
- if (!doProjection) {
- MemoryLogRecords emptyLogs =
- logRecords(
- readLogRowType,
- offsetBefore + i,
- Collections.emptyList(),
- Collections.emptyList());
- MultiBytesView bytesView =
- MultiBytesView.builder()
- .addBytes(
- expectedLogs.getMemorySegment(),
- 0,
- expectedLogs.sizeInBytes())
- .addBytes(emptyLogs.getMemorySegment(), 0,
emptyLogs.sizeInBytes())
- .build();
- expectedLogs = MemoryLogRecords.pointToBytesView(bytesView);
- }
actualLogRecords = readLogRecords(logTablet, 0, logProjection);
assertThatLogRecords(actualLogRecords)
.withSchema(readLogRowType)
@@ -969,31 +963,25 @@ class KvTabletTest {
endOffset = logTablet.localLogEndOffset();
assertThat(endOffset).isEqualTo(offsetBefore + i + 1);
- // the empty batch will be read if no projection,
- // the empty batch will not be read if has projection
- if (!doProjection) {
- MemoryLogRecords emptyLogs =
- logRecords(
- readLogRowType,
- offsetBefore + i,
- Collections.emptyList(),
- Collections.emptyList());
- MultiBytesView bytesView =
- MultiBytesView.builder()
- .addBytes(
- expectedLogs.getMemorySegment(),
- 0,
- expectedLogs.sizeInBytes())
- .addBytes(emptyLogs.getMemorySegment(), 0,
emptyLogs.sizeInBytes())
- .build();
- expectedLogs = MemoryLogRecords.pointToBytesView(bytesView);
- }
- actualLogRecords = readLogRecords(logTablet, 0, logProjection);
- assertThatLogRecords(actualLogRecords)
- .withSchema(readLogRowType)
- .assertCheckSum(!doProjection)
- .isEqualTo(expectedLogs);
+ MemoryLogRecords emptyLogs =
+ logRecords(
+ readLogRowType,
+ offsetBefore + i,
+ Collections.emptyList(),
+ Collections.emptyList());
+ MultiBytesView bytesView =
+ MultiBytesView.builder()
+ .addBytes(
+ expectedLogs.getMemorySegment(), 0,
expectedLogs.sizeInBytes())
+ .addBytes(emptyLogs.getMemorySegment(), 0,
emptyLogs.sizeInBytes())
+ .build();
+ expectedLogs = MemoryLogRecords.pointToBytesView(bytesView);
}
+ actualLogRecords = readLogRecords(logTablet, 0, logProjection);
+ assertThatLogRecords(actualLogRecords)
+ .withSchema(readLogRowType)
+ .assertCheckSum(!doProjection)
+ .isEqualTo(expectedLogs);
List<KvRecord> kvData3 =
Arrays.asList(