loserwang1024 opened a new issue, #2369: URL: https://github.com/apache/fluss/issues/2369
### Search before asking - [x] I searched in the [issues](https://github.com/apache/fluss/issues) and found nothing similar. ### Fluss version 0.8.0 (latest release) ### Please describe the bug 🐞 ### Problem When reading from a FirstRowMergeEngine table, the read operation occasionally gets stuck for several hours. During this period, the fetch delay suddenly increases from around 100ms to several hours. Upon investigating the logs, I observed the following behavior: The fetch logs continuously print, but no data is actually read: ```java 2026-01-12 20:13:57,856 INFO org.apache.fluss.client.table.scanner.log.LogFetcher [] - Preparing fetch request for bucket TableBucket{tableId=17, bucket=1} to server 1 2026-01-12 20:13:57,860 INFO org.apache.fluss.client.table.scanner.log.LogFetcher [] - Preparing fetch request for bucket TableBucket{tableId=17, bucket=1} to server 1 .... the repeated log... 2026-01-12 21:51:21,371 INFO org.apache.fluss.client.table.scanner.log.LogFetcher [] - Preparing fetch request for bucket TableBucket{tableId=17, bucket=1} to server 1 2026-01-12 21:51:22,390 INFO org.apache.fluss.client.table.scanner.log.RemoteLogDownloader [] - Successfully downloaded remote log segment file 84cbd058-d9f7-487c-b401-e24c67da3c49_00000000000004368958.log to local cost 8718 ms. 026-01-12 21:51:23,491 [DownloadRemoteLog-[ods.sales_flat_order_item_create]] INFO org.apache.fluss.client.table.scanner.log.RemoteLogDownloader [] - Consumed and deleted the fetched log segment file 84cbd058-d9f7-487c-b401-e24c67da3c49_00000000000004368958.log for bucket TableBucket{tableId=17, bucket=1}. ``` This indicates that the fetch requests were repeatedly sent for over an hour without returning any data. This is unexpected because the logs do exist. ### Cause After reviewing the code, I identified the following issue: In the FirstRowMergeEngine, when an update operation occurs, it generates an empty log entry (but with a valid offset). During projection pushdown queries, if the message content is empty, it gets filtered out ([fluss-417](https://github.com/apache/fluss/pull/417/files#diff-92c27dcaef32736733170903b9e1bf0e71cee1a230dcd4c04f1cbee13e799ae7)). <img width="836" height="300" alt="Image" src="https://github.com/user-attachments/assets/09fe3bd8-3c33-4cbd-90d3-285156da30ca" /> If a read batch happens to contain only these empty messages, the result returned will also be empty. This prevents the client from advancing the offset, causing it to repeatedly send ineffective fetch requests. The issue persists until a remote log segment is generated. However, since the user's actual data volume is small, generating remote logs can be slow, leading to scenarios where certain partitions remain unreadable for extended periods. It is important to note that no data is lost; however, data retrieval becomes significantly delayed. ### trigger Scenario This issue is easily triggered under the following conditions: Assume we have two local log segments(one is active , another is not active). Each time we read with `client.scanner.log.fetch.max-bytes-for-bucket = 2000 Bytes` 1. The first read starts at offset 100, returning data between offsets 100 and 200. The data between offsets 201 and 202 is empty and thus gets filtered out. 2. The second read starts at offset 201, but only empty messages are returned (since each read operation processes one segment at a time). 3. then will alway repeat reading starts at offset 201 until remote file is generated. It's easily to trigger in such a situation Assume we have two local segments now, if we read eacth time with 2500 Bytes. The first time the start offset is 100, thus will retuen the data between 100 and 200( the data between 201 and 202 is emtpy, thus will be filtered. The secord time, the start offset is 201, but only emtpy will be return( each time we only read one segment) <img width="1611" height="1191" alt="Image" src="https://github.com/user-attachments/assets/1be264f0-94e1-4c30-be6d-67a47ab042c9" /> ### Min Reproduce Step I have modify org.apache.fluss.client.table.FlussTableITCase#testFirstRowMergeEngine. * If doProjection = true, will be stuck forever. * If doProjection = false, will execute quickily. ```java @ParameterizedTest @ValueSource(booleans = {true, false}) void testFirstRowMergeEngine(boolean doProjection) throws Exception { Configuration conf = initConfig(); // To better mock the issue: // 1. disable remote log task so that won't read remote log. // 2. Set LOG_SEGMENT_FILE_SIZE to make sure one segment before last segment is contain // empty batch at the end. // In this way, if skip empty batch, the read will in stuck forever. conf.set(ConfigOptions.REMOTE_LOG_TASK_INTERVAL_DURATION, Duration.ZERO); conf.set( ConfigOptions.LOG_SEGMENT_FILE_SIZE, new MemorySize(5 * V0_RECORD_BATCH_HEADER_SIZE)); conf.set( ConfigOptions.CLIENT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET, new MemorySize(5 * V0_RECORD_BATCH_HEADER_SIZE)); final FlussClusterExtension flussClusterExtension = FlussClusterExtension.builder() .setNumOfTabletServers(3) .setClusterConf(conf) .build(); flussClusterExtension.start(); TableDescriptor tableDescriptor = TableDescriptor.builder() .schema(DATA1_SCHEMA_PK) .property(ConfigOptions.TABLE_MERGE_ENGINE, MergeEngineType.FIRST_ROW) .build(); RowType rowType = DATA1_SCHEMA_PK.getRowType(); String tableName = String.format( "test_first_row_merge_engine_with_%s", doProjection ? "projection" : "no_projection"); TablePath tablePath = TablePath.of("test_db_1", tableName); int rows = 5; int duplicateNum = 10; int batchSize = 3; int count = 0; // Case1: Test normal update to generator not empty cdc logs. Table table = null; LogScanner logScanner = null; try (Connection connection = ConnectionFactory.createConnection( flussClusterExtension.getClientConfig()); Admin admin = connection.getAdmin()) { admin.createDatabase(tablePath.getDatabaseName(), DatabaseDescriptor.EMPTY, false) .get(); admin.createTable(tablePath, tableDescriptor, false).get(); table = connection.getTable(tablePath); // first, put rows UpsertWriter upsertWriter = table.newUpsert().createWriter(); List<InternalRow> expectedScanRows = new ArrayList<>(rows); List<InternalRow> expectedLookupRows = new ArrayList<>(rows); for (int id = 0; id < rows; id++) { for (int num = 0; num < duplicateNum; num++) { upsertWriter.upsert(row(id, "value_" + num)); if (count++ > batchSize) { upsertWriter.flush(); count = 0; } } expectedLookupRows.add(row(id, "value_0")); expectedScanRows.add(doProjection ? row(id) : row(id, "value_0")); } upsertWriter.flush(); Lookuper lookuper = table.newLookup().createLookuper(); // now, get rows by lookup for (int id = 0; id < rows; id++) { InternalRow gotRow = lookuper.lookup(row(id)).get().getSingletonRow(); assertThatRow(gotRow).withSchema(rowType).isEqualTo(expectedLookupRows.get(id)); } Scan scan = table.newScan(); if (doProjection) { scan = scan.project(new int[] {0}); // do projection. } logScanner = scan.createLogScanner(); logScanner.subscribeFromBeginning(0); List<ScanRecord> actualLogRecords = new ArrayList<>(0); while (actualLogRecords.size() < rows) { ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1)); scanRecords.forEach(actualLogRecords::add); } assertThat(actualLogRecords).hasSize(rows); for (int i = 0; i < actualLogRecords.size(); i++) { ScanRecord scanRecord = actualLogRecords.get(i); assertThat(scanRecord.getChangeType()).isEqualTo(ChangeType.INSERT); assertThatRow(scanRecord.getRow()) .withSchema(doProjection ? rowType.project(new int[] {0}) : rowType) .isEqualTo(expectedScanRows.get(i)); } // Case2: Test all the update in the write batch are duplicate(Thus generate empty cdc // logs). // insert duplicate rows again to generate empty cdc log. for (int num = 0; num < duplicateNum; num++) { upsertWriter.upsert(row(0, "value_" + num)); upsertWriter.flush(); } // insert a new row. upsertWriter.upsert(row(rows + 1, "new_value")); actualLogRecords = new ArrayList<>(0); while (actualLogRecords.isEmpty()) { ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1)); scanRecords.forEach(actualLogRecords::add); } logScanner.close(); assertThat(actualLogRecords).hasSize(1); ScanRecord scanRecord = actualLogRecords.get(0); assertThat(scanRecord.getChangeType()).isEqualTo(ChangeType.INSERT); assertThatRow(scanRecord.getRow()) .withSchema(doProjection ? rowType.project(new int[] {0}) : rowType) .isEqualTo(doProjection ? row(rows + 1) : row(rows + 1, "new_value")); } finally { if (logScanner != null) { logScanner.close(); } if (table != null) { table.close(); } flussClusterExtension.close(); } } ``` ### Solution _No response_ ### Are you willing to submit a PR? - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
