loserwang1024 opened a new issue, #2369:
URL: https://github.com/apache/fluss/issues/2369

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/apache/fluss/issues) and 
found nothing similar.
   
   
   ### Fluss version
   
   0.8.0 (latest release)
   
   ### Please describe the bug 🐞
   
   ### Problem
   
   When reading from a FirstRowMergeEngine table, the read operation 
occasionally gets stuck for several hours. During this period, the fetch delay 
suddenly increases from around 100ms to several hours. Upon investigating the 
logs, I observed the following behavior:
   
   The fetch logs continuously print, but no data is actually read:
   
   ```java
   2026-01-12 20:13:57,856 INFO  
org.apache.fluss.client.table.scanner.log.LogFetcher         [] - Preparing 
fetch request for bucket TableBucket{tableId=17, bucket=1} to server 1
   2026-01-12 20:13:57,860 INFO  
org.apache.fluss.client.table.scanner.log.LogFetcher         [] - Preparing 
fetch request for bucket TableBucket{tableId=17, bucket=1} to server 1
   
   .... the repeated log...
   2026-01-12 21:51:21,371  INFO  
org.apache.fluss.client.table.scanner.log.LogFetcher         [] - Preparing 
fetch request for bucket TableBucket{tableId=17, bucket=1} to server 1
   
   2026-01-12 21:51:22,390  INFO  
org.apache.fluss.client.table.scanner.log.RemoteLogDownloader [] - Successfully 
downloaded remote log segment file  
84cbd058-d9f7-487c-b401-e24c67da3c49_00000000000004368958.log to local cost 
8718 ms.
   026-01-12 21:51:23,491 
[DownloadRemoteLog-[ods.sales_flat_order_item_create]] INFO  
org.apache.fluss.client.table.scanner.log.RemoteLogDownloader [] - Consumed and 
deleted the fetched log segment file 
84cbd058-d9f7-487c-b401-e24c67da3c49_00000000000004368958.log for bucket 
TableBucket{tableId=17, bucket=1}.
   ```
   
   This indicates that the fetch requests were repeatedly sent for over an hour 
without returning any data. This is unexpected because the logs do exist.
   
   ### Cause
   After reviewing the code, I identified the following issue:
   In the FirstRowMergeEngine, when an update operation occurs, it generates an 
empty log entry (but with a valid offset). During projection pushdown queries, 
if the message content is empty, it gets filtered out 
([fluss-417](https://github.com/apache/fluss/pull/417/files#diff-92c27dcaef32736733170903b9e1bf0e71cee1a230dcd4c04f1cbee13e799ae7)).
   
   <img width="836" height="300" alt="Image" 
src="https://github.com/user-attachments/assets/09fe3bd8-3c33-4cbd-90d3-285156da30ca";
 />
   
   
   If a read batch happens to contain only these empty messages, the result 
returned will also be empty. This prevents the client from advancing the 
offset, causing it to repeatedly send ineffective fetch requests. The issue 
persists until a remote log segment is generated. However, since the user's 
actual data volume is small, generating remote logs can be slow, leading to 
scenarios where certain partitions remain unreadable for extended periods.
   
   It is important to note that no data is lost; however, data retrieval 
becomes significantly delayed.
   
   ### trigger Scenario
   This issue is easily triggered under the following conditions:
   
   Assume we have two local log segments(one is active , another is not 
active). Each time we read with `client.scanner.log.fetch.max-bytes-for-bucket 
= 2000 Bytes`
   1. The first read starts at offset 100, returning data between offsets 100 
and 200. The data between offsets 201 and 202 is empty and thus gets filtered 
out.
   2. The second read starts at offset 201, but only empty messages are 
returned (since each read operation processes one segment at a time).
   3. then will alway repeat reading starts at offset 201 until remote file is 
generated.
   
   
   It's easily to trigger in such a situation
   
   Assume we have two local segments  now, if we read eacth time with 2500 
Bytes. The first time  the start offset is 100, thus will retuen the data 
between 100 and 200( the data between 201 and 202 is emtpy,  thus will be 
filtered. The secord time, the start offset is 201, but only emtpy will be 
return( each time we only read one segment)
   <img width="1611" height="1191" alt="Image" 
src="https://github.com/user-attachments/assets/1be264f0-94e1-4c30-be6d-67a47ab042c9";
 />
   
   ### Min Reproduce Step
   I have modify 
org.apache.fluss.client.table.FlussTableITCase#testFirstRowMergeEngine.
   * If doProjection = true, will be stuck forever.
   * If  doProjection = false, will execute quickily.
   
   ```java
       @ParameterizedTest
       @ValueSource(booleans = {true, false})
       void testFirstRowMergeEngine(boolean doProjection) throws Exception {
           Configuration conf = initConfig();
           // To better mock the issue: 
           // 1. disable remote log task so that won't read remote log.
           // 2. Set LOG_SEGMENT_FILE_SIZE to make sure one segment before last 
segment is contain
           // empty batch at the end.
           //  In this way, if skip empty batch, the read will in stuck forever.
           conf.set(ConfigOptions.REMOTE_LOG_TASK_INTERVAL_DURATION, 
Duration.ZERO);
           conf.set(
                   ConfigOptions.LOG_SEGMENT_FILE_SIZE,
                   new MemorySize(5 * V0_RECORD_BATCH_HEADER_SIZE));
           conf.set(
                   ConfigOptions.CLIENT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET,
                   new MemorySize(5 * V0_RECORD_BATCH_HEADER_SIZE));
           final FlussClusterExtension flussClusterExtension =
                   FlussClusterExtension.builder()
                           .setNumOfTabletServers(3)
                           .setClusterConf(conf)
                           .build();
           flussClusterExtension.start();
   
           TableDescriptor tableDescriptor =
                   TableDescriptor.builder()
                           .schema(DATA1_SCHEMA_PK)
                           .property(ConfigOptions.TABLE_MERGE_ENGINE, 
MergeEngineType.FIRST_ROW)
                           .build();
           RowType rowType = DATA1_SCHEMA_PK.getRowType();
           String tableName =
                   String.format(
                           "test_first_row_merge_engine_with_%s",
                           doProjection ? "projection" : "no_projection");
           TablePath tablePath = TablePath.of("test_db_1", tableName);
   
           int rows = 5;
           int duplicateNum = 10;
           int batchSize = 3;
           int count = 0;
           // Case1: Test normal update to generator not empty cdc logs.
           Table table = null;
           LogScanner logScanner = null;
           try (Connection connection =
                           ConnectionFactory.createConnection(
                                   flussClusterExtension.getClientConfig());
                   Admin admin = connection.getAdmin()) {
               admin.createDatabase(tablePath.getDatabaseName(), 
DatabaseDescriptor.EMPTY, false)
                       .get();
               admin.createTable(tablePath, tableDescriptor, false).get();
               table = connection.getTable(tablePath);
               // first, put rows
               UpsertWriter upsertWriter = table.newUpsert().createWriter();
               List<InternalRow> expectedScanRows = new ArrayList<>(rows);
               List<InternalRow> expectedLookupRows = new ArrayList<>(rows);
               for (int id = 0; id < rows; id++) {
                   for (int num = 0; num < duplicateNum; num++) {
                       upsertWriter.upsert(row(id, "value_" + num));
                       if (count++ > batchSize) {
                           upsertWriter.flush();
                           count = 0;
                       }
                   }
   
                   expectedLookupRows.add(row(id, "value_0"));
                   expectedScanRows.add(doProjection ? row(id) : row(id, 
"value_0"));
               }
   
               upsertWriter.flush();
   
               Lookuper lookuper = table.newLookup().createLookuper();
               // now, get rows by lookup
               for (int id = 0; id < rows; id++) {
                   InternalRow gotRow = 
lookuper.lookup(row(id)).get().getSingletonRow();
                   
assertThatRow(gotRow).withSchema(rowType).isEqualTo(expectedLookupRows.get(id));
               }
   
               Scan scan = table.newScan();
               if (doProjection) {
                   scan = scan.project(new int[] {0}); // do projection.
               }
               logScanner = scan.createLogScanner();
   
               logScanner.subscribeFromBeginning(0);
               List<ScanRecord> actualLogRecords = new ArrayList<>(0);
               while (actualLogRecords.size() < rows) {
                   ScanRecords scanRecords = 
logScanner.poll(Duration.ofSeconds(1));
                   scanRecords.forEach(actualLogRecords::add);
               }
               assertThat(actualLogRecords).hasSize(rows);
               for (int i = 0; i < actualLogRecords.size(); i++) {
                   ScanRecord scanRecord = actualLogRecords.get(i);
                   
assertThat(scanRecord.getChangeType()).isEqualTo(ChangeType.INSERT);
                   assertThatRow(scanRecord.getRow())
                           .withSchema(doProjection ? rowType.project(new int[] 
{0}) : rowType)
                           .isEqualTo(expectedScanRows.get(i));
               }
   
               // Case2: Test all the update in the write batch are 
duplicate(Thus generate empty cdc
               // logs).
               // insert duplicate rows again to generate empty cdc log.
               for (int num = 0; num < duplicateNum; num++) {
                   upsertWriter.upsert(row(0, "value_" + num));
                   upsertWriter.flush();
               }
   
               // insert a new row.
               upsertWriter.upsert(row(rows + 1, "new_value"));
   
               actualLogRecords = new ArrayList<>(0);
               while (actualLogRecords.isEmpty()) {
                   ScanRecords scanRecords = 
logScanner.poll(Duration.ofSeconds(1));
                   scanRecords.forEach(actualLogRecords::add);
               }
               logScanner.close();
               assertThat(actualLogRecords).hasSize(1);
               ScanRecord scanRecord = actualLogRecords.get(0);
               
assertThat(scanRecord.getChangeType()).isEqualTo(ChangeType.INSERT);
               assertThatRow(scanRecord.getRow())
                       .withSchema(doProjection ? rowType.project(new int[] 
{0}) : rowType)
                       .isEqualTo(doProjection ? row(rows + 1) : row(rows + 1, 
"new_value"));
           } finally {
               if (logScanner != null) {
                   logScanner.close();
               }
               if (table != null) {
                   table.close();
               }
               flussClusterExtension.close();
           }
       }
   ```
   
   ### Solution
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to