vinothchandar commented on code in PR #6386:
URL: https://github.com/apache/hudi/pull/6386#discussion_r945392427
##########
hudi-utilities/pom.xml:
##########
@@ -171,6 +171,14 @@
</exclusions>
</dependency>
+ <!-- Pulsar Spark Connector -->
+ <dependency>
Review Comment:
the bundle needs to change too?
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -338,6 +339,7 @@ public Pair<Option<String>, JavaRDD<WriteStatus>>
syncOnce() throws IOException
metrics.updateDeltaStreamerSyncMetrics(System.currentTimeMillis());
+ // TODO revisit (too early to unpersist)
Review Comment:
lets remove all extraneous minor changes from this PR.
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java:
##########
@@ -110,11 +110,13 @@ public static OffsetRange[]
computeOffsetRanges(Map<TopicPartition, Long> fromOf
Comparator<OffsetRange> byPartition =
Comparator.comparing(OffsetRange::partition);
// Create initial offset ranges for each 'to' partition, with from = to
offsets.
- OffsetRange[] ranges = new OffsetRange[toOffsetMap.size()];
- toOffsetMap.keySet().stream().map(tp -> {
+ OffsetRange[] ranges = toOffsetMap.keySet().stream().map(tp -> {
Review Comment:
lets do these in a separate PR?
##########
pom.xml:
##########
@@ -97,7 +97,9 @@
<fasterxml.spark3.version>2.10.0</fasterxml.spark3.version>
<kafka.version>2.0.0</kafka.version>
<kafka.spark3.version>2.4.1</kafka.spark3.version>
- <pulsar.version>2.8.1</pulsar.version>
+ <pulsar.version>2.10.1</pulsar.version>
Review Comment:
do the pulsar callbacks or existing functionality around pulsar still work?
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java:
##########
@@ -827,7 +827,7 @@ protected Boolean
onInitializingWriteClient(SparkRDDWriteClient writeClient) {
* Close all resources.
*/
public void close() {
- if (null != deltaSync) {
+ if (deltaSync != null) {
Review Comment:
same
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java:
##########
@@ -110,11 +110,13 @@ public static OffsetRange[]
computeOffsetRanges(Map<TopicPartition, Long> fromOf
Comparator<OffsetRange> byPartition =
Comparator.comparing(OffsetRange::partition);
// Create initial offset ranges for each 'to' partition, with from = to
offsets.
- OffsetRange[] ranges = new OffsetRange[toOffsetMap.size()];
- toOffsetMap.keySet().stream().map(tp -> {
+ OffsetRange[] ranges = toOffsetMap.keySet().stream().map(tp -> {
long fromOffset = fromOffsetMap.getOrDefault(tp, 0L);
return OffsetRange.create(tp, fromOffset, fromOffset);
- }).sorted(byPartition).collect(Collectors.toList()).toArray(ranges);
+ })
+ .sorted(byPartition)
Review Comment:
revert formatting change
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]