nsivabalan commented on a change in pull request #2438:
URL: https://github.com/apache/hudi/pull/2438#discussion_r670978737
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
##########
@@ -212,6 +234,9 @@ public KafkaOffsetGen(TypedProperties props) {
Set<TopicPartition> topicPartitions = partitionInfoList.stream()
.map(x -> new TopicPartition(x.topic(),
x.partition())).collect(Collectors.toSet());
+ if (Config.KAFKA_CHECKPOINT_TYPE_TIMESTAMP.equals(kafkaCheckpointType)
&& isValidCheckpointType(lastCheckpointStr)) {
+ lastCheckpointStr = getOffsetsByTimestamp(consumer, partitionInfoList,
topicPartitions, topicName, Long.parseLong(lastCheckpointStr.get()));
+ }
Review comment:
I was expecting a else if block after this line. Can you clarify please.
If not, we might go into the else block ?
##########
File path:
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
##########
@@ -64,7 +63,7 @@ public void teardown() throws Exception {
private TypedProperties getConsumerConfigs(String autoOffsetReset) {
TypedProperties props = new TypedProperties();
- props.put(Config.KAFKA_AUTO_OFFSET_RESET, autoOffsetReset);
+ props.put("auto.offset.reset", autoOffsetReset);
Review comment:
Do you think we can add some tests to this class for the timestamp type?
##########
File path:
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
##########
@@ -193,7 +193,7 @@ public void testJsonKafkaSourceWithDefaultUpperCap() {
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession,
schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
- Config.maxEventsFromKafkaSource = 500;
+ //props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", "500");
Review comment:
why commented out?
##########
File path:
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
##########
@@ -193,7 +193,7 @@ public void testJsonKafkaSourceWithDefaultUpperCap() {
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession,
schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
- Config.maxEventsFromKafkaSource = 500;
+ //props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", "500");
Review comment:
I tried your patch locally. the test fails if I uncomment this line. I
don't understand ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]