luoyuxia commented on code in PR #127:
URL: https://github.com/apache/fluss-rust/pull/127#discussion_r2675761771
##########
crates/fluss/src/client/table/log_fetch_buffer.rs:
##########
@@ -318,6 +321,27 @@ impl DefaultCompletedFetch {
}
}
}
+
+ /// Get the next batch directly without row iteration
+ fn next_fetched_batch(&mut self) -> Result<Option<RecordBatch>> {
+ loop {
+ let Some(log_batch) = self.log_record_batch.next() else {
+ self.drain();
+ return Ok(None);
+ };
+
+ let record_batch = log_batch.record_batch(&self.read_context)?;
Review Comment:
we will still the same logic in `next_fetched_record`.
```
if record.offset() >= self.next_fetch_offset {
return Ok(Some(record));
}
```
Assume we write a record batch into fluss, whose offset is 0 ~ 6.
Then, we subscribe from offset 2. we only want to records from 2~ 6.
Server will always return the record batch from 0 ~ 6, in there we will
always return 0 ~ 6 to users.
We will need to truncte the record batch to only return 2~6 to usets.
##########
crates/fluss/tests/integration/table.rs:
##########
@@ -469,4 +469,439 @@ mod table_test {
records.sort_by_key(|r| r.offset());
records
}
+
+ #[tokio::test]
+ async fn test_poll_batches_basic() {
+ let cluster = get_fluss_cluster();
+ let connection = cluster.get_fluss_connection().await;
+ let admin = connection.get_admin().await.expect("Failed to get admin");
+
+ let table_path = TablePath::new("fluss".to_string(),
"test_poll_batches_basic".to_string());
+
+ // Create table
+ let schema = Schema::builder()
+ .column("id", DataTypes::int())
+ .column("name", DataTypes::string())
+ .build()
+ .expect("Failed to build schema");
+
+ let descriptor = TableDescriptor::builder()
+ .schema(schema)
+ .build()
+ .expect("Failed to build table descriptor");
+
+ create_table(&admin, &table_path, &descriptor).await;
+
+ // Wait for table to be ready
+ tokio::time::sleep(Duration::from_secs(1)).await;
+
+ let table = connection
+ .get_table(&table_path)
+ .await
+ .expect("Failed to get table");
+
Review Comment:
in here, we can poll to verify poll empty batch, so that we won't need
another test method `test_poll_batches_empty`
##########
crates/fluss/tests/integration/table.rs:
##########
@@ -469,4 +469,439 @@ mod table_test {
records.sort_by_key(|r| r.offset());
records
}
+
+ #[tokio::test]
+ async fn test_poll_batches_basic() {
+ let cluster = get_fluss_cluster();
+ let connection = cluster.get_fluss_connection().await;
+ let admin = connection.get_admin().await.expect("Failed to get admin");
+
+ let table_path = TablePath::new("fluss".to_string(),
"test_poll_batches_basic".to_string());
+
+ // Create table
+ let schema = Schema::builder()
+ .column("id", DataTypes::int())
+ .column("name", DataTypes::string())
+ .build()
+ .expect("Failed to build schema");
+
+ let descriptor = TableDescriptor::builder()
+ .schema(schema)
+ .build()
+ .expect("Failed to build table descriptor");
+
+ create_table(&admin, &table_path, &descriptor).await;
+
+ // Wait for table to be ready
+ tokio::time::sleep(Duration::from_secs(1)).await;
+
+ let table = connection
+ .get_table(&table_path)
+ .await
+ .expect("Failed to get table");
+
Review Comment:
Is it possible to combine the test for poll batch into one or less test
methods? I tend to have less test method for easy read and reduce test time
cost.
##########
crates/fluss/tests/integration/table.rs:
##########
@@ -469,4 +469,439 @@ mod table_test {
records.sort_by_key(|r| r.offset());
records
}
+
+ #[tokio::test]
+ async fn test_poll_batches_basic() {
+ let cluster = get_fluss_cluster();
+ let connection = cluster.get_fluss_connection().await;
+ let admin = connection.get_admin().await.expect("Failed to get admin");
+
+ let table_path = TablePath::new("fluss".to_string(),
"test_poll_batches_basic".to_string());
+
+ // Create table
+ let schema = Schema::builder()
+ .column("id", DataTypes::int())
+ .column("name", DataTypes::string())
+ .build()
+ .expect("Failed to build schema");
+
+ let descriptor = TableDescriptor::builder()
+ .schema(schema)
+ .build()
+ .expect("Failed to build table descriptor");
+
+ create_table(&admin, &table_path, &descriptor).await;
+
+ // Wait for table to be ready
+ tokio::time::sleep(Duration::from_secs(1)).await;
+
+ let table = connection
+ .get_table(&table_path)
+ .await
+ .expect("Failed to get table");
+
Review Comment:
I think we can also cover poll batch for poroject in this test method
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]