This is an automated email from the ASF dual-hosted git repository. mingmxu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 677beaf KafkaIO : fix a typo in method name. (#5106) 677beaf is described below commit 677beafba52e1b77b6a6d01c1b0c56fa55e49c00 Author: Raghu Angadi <rang...@apache.org> AuthorDate: Thu Apr 12 10:11:26 2018 -0700 KafkaIO : fix a typo in method name. (#5106) --- .../src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java | 2 +- .../main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java index f615ad6..4935986 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java @@ -101,7 +101,7 @@ class ConsumerSpEL { return -1L; // This is the timestamp used in Kafka for older messages without timestamps. } - public KafkaTimestampType getRecordTimestamptType( + public KafkaTimestampType getRecordTimestampType( ConsumerRecord<byte[], byte[]> rawRecord) { if (hasRecordTimestamp) { return KafkaTimestampType.forOrdinal(rawRecord.timestampType().ordinal()); diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index 533c8b3..e2c28d5 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -164,7 +164,7 @@ class KafkaUnboundedReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> { } @Override - public boolean advance() throws IOException { + public boolean advance() { /* Read first record (if any). we need to loop here because : * - (a) some records initially need to be skipped if they are before consumedOffset * - (b) if curBatch is empty, we want to fetch next batch and then advance. @@ -211,7 +211,7 @@ class KafkaUnboundedReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> { rawRecord.partition(), rawRecord.offset(), consumerSpEL.getRecordTimestamp(rawRecord), - consumerSpEL.getRecordTimestamptType(rawRecord), + consumerSpEL.getRecordTimestampType(rawRecord), keyDeserializerInstance.deserialize(rawRecord.topic(), rawRecord.key()), valueDeserializerInstance.deserialize(rawRecord.topic(), rawRecord.value())); -- To stop receiving notification emails like this one, please contact ming...@apache.org.