This is an automated email from the ASF dual-hosted git repository.

mingmxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 677beaf  KafkaIO : fix a typo in method name. (#5106)
677beaf is described below

commit 677beafba52e1b77b6a6d01c1b0c56fa55e49c00
Author: Raghu Angadi <rang...@apache.org>
AuthorDate: Thu Apr 12 10:11:26 2018 -0700

    KafkaIO : fix a typo in method name. (#5106)
---
 .../src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java      | 2 +-
 .../main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java  | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
index f615ad6..4935986 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
@@ -101,7 +101,7 @@ class ConsumerSpEL {
     return -1L; // This is the timestamp used in Kafka for older messages 
without timestamps.
   }
 
-  public KafkaTimestampType getRecordTimestamptType(
+  public KafkaTimestampType getRecordTimestampType(
     ConsumerRecord<byte[], byte[]> rawRecord) {
     if (hasRecordTimestamp) {
       return 
KafkaTimestampType.forOrdinal(rawRecord.timestampType().ordinal());
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
index 533c8b3..e2c28d5 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
@@ -164,7 +164,7 @@ class KafkaUnboundedReader<K, V> extends 
UnboundedReader<KafkaRecord<K, V>> {
   }
 
   @Override
-  public boolean advance() throws IOException {
+  public boolean advance() {
     /* Read first record (if any). we need to loop here because :
      *  - (a) some records initially need to be skipped if they are before 
consumedOffset
      *  - (b) if curBatch is empty, we want to fetch next batch and then 
advance.
@@ -211,7 +211,7 @@ class KafkaUnboundedReader<K, V> extends 
UnboundedReader<KafkaRecord<K, V>> {
             rawRecord.partition(),
             rawRecord.offset(),
             consumerSpEL.getRecordTimestamp(rawRecord),
-            consumerSpEL.getRecordTimestamptType(rawRecord),
+            consumerSpEL.getRecordTimestampType(rawRecord),
             keyDeserializerInstance.deserialize(rawRecord.topic(), 
rawRecord.key()),
             valueDeserializerInstance.deserialize(rawRecord.topic(), 
rawRecord.value()));
 

-- 
To stop receiving notification emails like this one, please contact
ming...@apache.org.

Reply via email to