leekeiabstraction commented on code in PR #313:
URL: https://github.com/apache/fluss-rust/pull/313#discussion_r2805847394
##########
bindings/cpp/include/fluss.hpp:
##########
@@ -306,19 +306,19 @@ enum class DatumType {
constexpr int64_t EARLIEST_OFFSET = -2;
constexpr int64_t LATEST_OFFSET = -1;
-enum class OffsetSpec {
+enum class OffsetType {
Earliest = 0,
Latest = 1,
Timestamp = 2,
};
-struct OffsetQuery {
- OffsetSpec spec;
+struct OffsetSpec {
+ OffsetType type;
int64_t timestamp{0};
- static OffsetQuery Earliest() { return {OffsetSpec::Earliest, 0}; }
- static OffsetQuery Latest() { return {OffsetSpec::Latest, 0}; }
- static OffsetQuery FromTimestamp(int64_t ts) { return
{OffsetSpec::Timestamp, ts}; }
+ static OffsetSpec Earliest() { return {OffsetType::Earliest, 0}; }
+ static OffsetSpec Latest() { return {OffsetType::Latest, 0}; }
+ static OffsetSpec Timestamp(int64_t ts) { return {OffsetType::Timestamp,
ts}; }
Review Comment:
Should we make the names consistent with Java side e.g. EarlistSpec,
LatestSpec, TimestampSpec
```java
public static class EarliestSpec extends OffsetSpec {}
/** latest offset spec. */
public static class LatestSpec extends OffsetSpec {}
/** timestamp offset spec. */
public static class TimestampSpec extends OffsetSpec {
```
##########
bindings/python/fluss/__init__.pyi:
##########
@@ -839,12 +841,28 @@ class ErrorCode:
INVALID_ALTER_TABLE_EXCEPTION: int
DELETION_DISABLED_EXCEPTION: int
-class OffsetType:
- """Offset type constants for list_offsets()."""
+class OffsetSpec:
+ """Offset specification for list_offsets(), matching Java's OffsetSpec.
- EARLIEST: str
- LATEST: str
- TIMESTAMP: str
+ Use factory methods to create instances:
+ OffsetSpec.earliest()
+ OffsetSpec.latest()
+ OffsetSpec.timestamp(ts)
+ """
+
+ @staticmethod
+ def earliest() -> "OffsetSpec":
+ """Create an OffsetSpec for the earliest available offset."""
+ ...
+ @staticmethod
+ def latest() -> "OffsetSpec":
+ """Create an OffsetSpec for the latest available offset."""
+ ...
+ @staticmethod
+ def timestamp(ts: int) -> "OffsetSpec":
Review Comment:
Ditto on making it consistent with java side e.g. earliestSpec
##########
bindings/python/example/example.py:
##########
@@ -30,12 +30,12 @@
async def main():
# Create connection configuration
config_spec = {
- "bootstrap.servers": "127.0.0.1:9123",
+ "bootstrap_servers": "127.0.0.1:9123",
# Add other configuration options as needed
- "writer.request-max-size": "10485760", # 10 MB
- "writer.acks": "all", # Wait for all replicas to acknowledge
- "writer.retries": "3", # Retry up to 3 times on failure
- "writer.batch-size": "1000", # Batch size for writes
+ "writer_request_max_size": "10485760", # 10 MB
+ "writer_acks": "all", # Wait for all replicas to acknowledge
+ "writer_retries": "3", # Retry up to 3 times on failure
+ "writer_batch_size": "1000", # Batch size for writes
Review Comment:
Hmmm, I'm of the mind that we should keep config keys consistent even across
different languages.
I can for example envision a scenario where a Flink user using Table API in
Python attempts to read from Fluss table but refer to our documentation
(granted, it is a user error) and used the wrong key (because underlying table
api, Java connector is used).
The threading of configurations through these systems can be confusing e.g.
prefixes/namespaces, changing the delimiter across different language would
make it even more challenging.
##########
bindings/python/src/lib.rs:
##########
@@ -50,21 +50,53 @@ static TOKIO_RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
.expect("Failed to create Tokio runtime")
});
-/// Offset type constants for list_offsets()
+/// Offset specification for list_offsets(), matching Java's OffsetSpec.
+///
+/// Use factory methods to create instances:
+/// OffsetSpec.earliest()
+/// OffsetSpec.latest()
+/// OffsetSpec.timestamp(ts)
#[pyclass]
#[derive(Clone)]
-pub struct OffsetType;
+pub struct OffsetSpec {
+ pub(crate) inner: fcore::rpc::message::OffsetSpec,
+}
#[pymethods]
-impl OffsetType {
- #[classattr]
- const EARLIEST: &'static str = "earliest";
+impl OffsetSpec {
+ /// Create an OffsetSpec for the earliest available offset.
+ #[staticmethod]
+ fn earliest() -> Self {
+ Self {
+ inner: fcore::rpc::message::OffsetSpec::Earliest,
+ }
+ }
+
+ /// Create an OffsetSpec for the latest available offset.
+ #[staticmethod]
+ fn latest() -> Self {
+ Self {
+ inner: fcore::rpc::message::OffsetSpec::Latest,
+ }
+ }
- #[classattr]
- const LATEST: &'static str = "latest";
+ /// Create an OffsetSpec for the offset at or after the given timestamp.
+ #[staticmethod]
+ fn timestamp(ts: i64) -> Self {
Review Comment:
Ditto on making it consistent with java side e.g. `earliestSpec`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]