TheNeuralBit commented on a change in pull request #13779:
URL: https://github.com/apache/beam/pull/13779#discussion_r562281418
##########
File path: sdks/python/apache_beam/io/kafka.py
##########
@@ -152,8 +155,18 @@ def __init__(
for tests and demo applications.
:param max_read_time: Maximum amount of time in seconds the transform
executes. Mainly used for tests and demo applications.
+ :param commit_offset_in_finalize: Whether to commit offsets when
finalizing.
+ :param timestamp_policy: The built-in timestamp policy which is used for
+ extracting timestamp from KafkaRecord.
:param expansion_service: The address (host:port) of the ExpansionService.
"""
+ if timestamp_policy not in [ReadFromKafka.processing_time_policy,
+ ReadFromKafka.create_time_policy,
+ ReadFromKafka.log_append_time]:
+ raise ValueError(
+ 'timestamp_policy should be one of '
+ '[ProcessingTime, CreateTime, LogAppendTime]')
Review comment:
nit: you might assign the allowed policies to a variable and re-use it
to generate this error message.
##########
File path:
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -586,8 +586,23 @@
setMaxReadTime(Duration.standardSeconds(config.maxReadTime));
}
setMaxNumRecords(config.maxNumRecords == null ? Long.MAX_VALUE :
config.maxNumRecords);
- setCommitOffsetsInFinalizeEnabled(false);
- setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime());
+
+ // Set committing offset configuration.
+ setCommitOffsetsInFinalizeEnabled(config.commitOffsetInFinalize);
+
+ // Set timestamp policy with built-in types.
+ String timestampPolicy = config.timestampPolicy;
+ if (timestampPolicy.equals("ProcessingTime")) {
+
setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime());
+ } else if (timestampPolicy.equals("CreateTime")) {
+
setTimestampPolicyFactory(TimestampPolicyFactory.withCreateTime(Duration.ZERO));
+ } else if (timestampPolicy.equals("LogAppendTime")) {
+
setTimestampPolicyFactory(TimestampPolicyFactory.withLogAppendTime());
+ } else {
+ throw new IllegalArgumentException(
+ "timestampPolicy should be one of (ProcessingTime, CreateTime,
LogAppendTime)");
+ }
Review comment:
:( sorry you have to add this logic for what's effectively an enumerated
type.
This makes me want to prioritize implementing the EnumerationType logical
type in portability and in Python. @chamikaramj would xlang folks be interested
in picking that up? It seems like enumerated types will be a common need for
external transform configuration.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]