lukecwik commented on a change in pull request #15951:
URL: https://github.com/apache/beam/pull/15951#discussion_r749756335



##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
##########
@@ -376,6 +396,9 @@ public ProcessContinuation processElement(
             outputTimestamp = extractOutputTimestampFn.apply(kafkaRecord);
           }
           receiver.outputWithTimestamp(KV.of(kafkaSourceDescriptor, 
kafkaRecord), outputTimestamp);
+          if (expectedOffset > endOffset) {

Review comment:
       Since the restriction is already bounded by `endOffset` you don't need 
to check here as well since `tryClaim` above will return false.

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java
##########
@@ -50,6 +50,14 @@
   @Nullable

Review comment:
       It seems like it should be an error to specify both `stop_read_time` and 
`stop_read_offset`, similarly for `start_read_time` and `start_read_offset`.
   
   We can add some simple error checking within the static method `of(...)` and 
`create(...)`

##########
File path: 
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
##########
@@ -116,10 +118,14 @@ public void setCurrentPos(long pos) {
       this.currentPos = pos;
     }
 
-    public void setStartOffsetForTime(long pos) {
+    public void setStartOffsetForTime(KV<Long, Instant> pos) {

Review comment:
       Why not make this method take two parameters and construct its own 
internal representation using the KV as necessary.
   
   Please do the same for `setStopOffsetForTime`

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -990,6 +1003,19 @@ public void setTimestampPolicy(String timestampPolicy) {
       return toBuilder().setStartReadTime(startReadTime).build();
     }
 
+    /**
+     * Use timestamp to set up stop offset. It is only supported by Kafka 
Client 0.10.1.0 onwards
+     * and the message format version after 0.10.0.
+     *
+     * <p>This results in hard failures in either of the following two cases : 
1. If one of more
+     * partitions do not contain any messages with timestamp larger than or 
equal to desired

Review comment:
       ```suggestion
        * <p>This results in hard failures in either of the following two cases 
: 1. If one or more
        * partitions do not contain any messages with timestamp larger than or 
equal to desired
   ```
   
   Can you also fix the same typo in `withStartReadTime`?

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1235,6 +1261,15 @@ public void setTimestampPolicy(String timestampPolicy) {
                 + ". If you are building with maven, set 
\"kafka.clients.version\" "
                 + "maven property to 0.10.1.0 or newer.");
       }
+      if (getStopReadTime() != null) {
+        checkArgument(
+            ConsumerSpEL.hasOffsetsForTimes(),
+            "Consumer.offsetsForTimes is only supported by Kafka Client 
0.10.1.0 onwards, "
+                + "current version of Kafka Client is "
+                + AppInfoParser.getVersion()
+                + ". If you are building with maven, set 
\"kafka.clients.version\" "
+                + "maven property to 0.10.1.0 or newer.");
+      }

Review comment:
       Either implement support for stopReadTime within 
ReadFromKafkaViaUnbounded or add an error 
[here](https://github.com/apache/beam/blob/7d54488eca45743e379f1873eda7f6d390f97b50/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1266)
 stating that `stopReadTime` is only supported via SDF implementation.

##########
File path: CHANGES.md
##########
@@ -76,6 +76,7 @@
   https://issues.apache.org/jira/browse/BEAM-11205)). For Google Cloud client 
library versions set by this BOM,
   see [this 
table](https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/24.0.0/artifact_details.html).
 * Removed avro-python3 dependency in AvroIO. Fastavro has already been our 
Avro library of choice on Python 3. Boolean use_fastavro is left for api 
compatibility, but will have no 
effect.([BEAM-13016](https://github.com/apache/beam/pull/15900)).
+* Support for stopReadTime on KafkaIO SDF 
(Java).([BEAM-13171](https://issues.apache.org/jira/browse/BEAM-13171)).

Review comment:
       It would make sense to list this under I/Os




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to