leekeiabstraction commented on code in PR #300:
URL: https://github.com/apache/fluss-rust/pull/300#discussion_r2808097334
##########
website/docs/user-guide/python/example/log-tables.md:
##########
@@ -0,0 +1,122 @@
+---
+sidebar_position: 4
+---
+# Log Tables
+
+Log tables are append-only tables without primary keys, suitable for event
streaming.
+
+## Creating a Log Table
+
+```python
+import pyarrow as pa
+
+schema = fluss.Schema(pa.schema([
+ pa.field("id", pa.int32()),
+ pa.field("name", pa.string()),
+ pa.field("score", pa.float32()),
+]))
+
+table_path = fluss.TablePath("fluss", "events")
+await admin.create_table(table_path, fluss.TableDescriptor(schema),
ignore_if_exists=True)
+```
+
+## Writing
+
+Rows can be appended as dicts, lists, or tuples. For bulk writes, use
`write_arrow()`, `write_arrow_batch()`, or `write_pandas()`.
+
+Write methods like `append()` and `write_arrow_batch()` return a
`WriteResultHandle`. You can ignore it for fire-and-forget semantics (flush at
the end), or `await handle.wait()` to block until the server acknowledges that
specific write.
+
+```python
+table = await conn.get_table(table_path)
+writer = table.new_append().create_writer()
+
+# Fire-and-forget: queue writes, flush at the end
+writer.append({"id": 1, "name": "Alice", "score": 95.5})
+writer.append([2, "Bob", 87.0])
+await writer.flush()
+
+# Per-record acknowledgment
+handle = writer.append({"id": 3, "name": "Charlie", "score": 91.0})
+await handle.wait()
+
+# Bulk writes
+writer.write_arrow(pa_table) # PyArrow Table
+writer.write_arrow_batch(record_batch) # PyArrow RecordBatch
+writer.write_pandas(df) # Pandas DataFrame
+await writer.flush()
+```
+
+## Reading
+
+There are two scanner types:
+- **Batch scanner** (`create_record_batch_log_scanner()`): returns Arrow
Tables or DataFrames, best for analytics
+- **Record scanner** (`create_log_scanner()`): returns individual records with
metadata (offset, timestamp, change type), best for streaming
+
+And two reading modes:
+- **`to_arrow()` / `to_pandas()`**: reads all data from subscribed buckets up
to the current latest offset, then returns. Best for one-shot batch reads.
+- **`poll_arrow()` / `poll()` / `poll_record_batch()`**: returns whatever data
is available within the timeout, then returns. Call in a loop for continuous
streaming.
+
+### Batch Read (One-Shot)
+
+```python
+num_buckets = (await admin.get_table_info(table_path)).num_buckets
+
+scanner = await table.new_scan().create_record_batch_log_scanner()
+scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in
range(num_buckets)})
+
+# Reads everything up to current latest offset, then returns
+arrow_table = scanner.to_arrow()
+df = scanner.to_pandas()
+```
+
+### Continuous Polling
+
+Use `poll_arrow()` or `poll()` in a loop for streaming consumption:
+
+```python
+# Batch scanner: poll as Arrow Tables
+scanner = await table.new_scan().create_record_batch_log_scanner()
+scanner.subscribe(bucket_id=0, start_offset=fluss.EARLIEST_OFFSET)
+
+while True:
+ result = scanner.poll_arrow(timeout_ms=5000)
+ if result.num_rows > 0:
+ print(result.to_pandas())
+
+# Record scanner: poll individual records with metadata
+scanner = await table.new_scan().create_log_scanner()
+scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in
range(num_buckets)})
+
+while True:
+ for record in scanner.poll(timeout_ms=5000):
+ print(f"offset={record.offset},
change={record.change_type.short_string()}, row={record.row}")
+```
+
+### Unsubscribing
+
+To stop consuming from a bucket, use `unsubscribe()`:
+
+```python
+scanner.unsubscribe(bucket_id=0)
+```
+
+### Subscribe from Latest Offset
Review Comment:
Will leave it to Anton to update this as his PR deals with making Offset
types consistent.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]