This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 164ee107c35 [MINOR] Fix typos in JavaScalaConverters and
KafkaOffsetGen (#12088)
164ee107c35 is described below
commit 164ee107c3516c8585d35fe56dee98b6400875f0
Author: Y Ethan Guo <[email protected]>
AuthorDate: Sun Oct 13 03:54:11 2024 -0700
[MINOR] Fix typos in JavaScalaConverters and KafkaOffsetGen (#12088)
---
.../main/scala/org/apache/hudi/util/JavaScalaConverters.scala | 6 +++---
.../apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java | 10 +++++-----
2 files changed, 8 insertions(+), 8 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JavaScalaConverters.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JavaScalaConverters.scala
index f359dfc293a..4a55bc7f2f0 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JavaScalaConverters.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JavaScalaConverters.scala
@@ -63,10 +63,10 @@ object JavaScalaConverters {
}
/**
- * @param javProperties properties in [[java.util.Properties]]
+ * @param javaProperties properties in [[java.util.Properties]]
* @return map in Scala [[Map]].
*/
- def convertJavaPropertiesToScalaMap(javProperties: java.util.Properties):
Map[String, String] = {
- javProperties.asScala.toMap
+ def convertJavaPropertiesToScalaMap(javaProperties: java.util.Properties):
Map[String, String] = {
+ javaProperties.asScala.toMap
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index 8e5750752b6..b279f42feb7 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -27,8 +27,8 @@ import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.exception.HoodieStreamerException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.sources.AvroKafkaSource;
-
import org.apache.hudi.utilities.sources.HoodieRetryingKafkaConsumer;
+
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -115,7 +115,7 @@ public class KafkaOffsetGen {
* Compute the offset ranges to read from Kafka, while handling newly
added partitions, skews, event limits.
*
* @param fromOffsetMap offsets where we left off last time
- * @param toOffsetMap offsets of where each partitions is currently at
+ * @param toOffsetMap offsets of where each partition is currently at
* @param numEvents maximum number of events to read.
* @param minPartitions minimum partitions used for
*/
@@ -177,7 +177,7 @@ public class KafkaOffsetGen {
}
}
}
- // We need to ensure every partition is part of returned offset ranges
even if we are not consuming any new msgs (for instance, if its already caught
up).
+ // We need to ensure every partition is part of returned offset ranges
even if we are not consuming any new msgs (for instance, if it's already caught
up).
// as this will be tracked as the checkpoint, we need to ensure all
partitions are part of final ranges.
Map<TopicPartition, List<OffsetRange>> missedRanges =
fromOffsetMap.entrySet().stream()
.filter((kv) -> !finalRanges.containsKey(kv.getKey()))
@@ -411,8 +411,8 @@ public class KafkaOffsetGen {
Map<TopicPartition, Long> lastOffsets =
consumer.endOffsets(topicPartitions);
for (Map.Entry<TopicPartition, Long> entry : lastOffsets.entrySet()) {
- Long offect = checkpointOffsets.getOrDefault(entry.getKey(), 0L);
- delayCount += entry.getValue() - offect > 0 ? entry.getValue() - offect
: 0L;
+ Long offset = checkpointOffsets.getOrDefault(entry.getKey(), 0L);
+ delayCount += entry.getValue() - offset > 0 ? entry.getValue() - offset
: 0L;
}
return delayCount;
}