EmmyMiao87 commented on a change in pull request #870: Optimize the consumer
assignment of Kafka routine load job
URL: https://github.com/apache/incubator-doris/pull/870#discussion_r271707624
##########
File path:
fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
##########
@@ -70,35 +74,61 @@ public void addPartitionOffset(Pair<Integer, Long>
partitionOffset) {
partitionIdToOffset.put(partitionOffset.first, partitionOffset.second);
}
+ public Long getOffsetByPartition(int kafkaPartition) {
+ return partitionIdToOffset.get(kafkaPartition);
+ }
+
+ public boolean containsPartition(Integer kafkaPartition) {
+ return partitionIdToOffset.containsKey(kafkaPartition);
+ }
+
+ public boolean hasPartition() {
+ return partitionIdToOffset.isEmpty();
+ }
+
// (partition id, end offset)
- // end offset = -1 while begin offset of partition is 0
- @Override
- public String toString() {
- Map<Integer, Long> showPartitionIdToOffset = new HashMap<>();
+ // OFFSET_ZERO: user set offset == 0, no committed msg
+ // OFFSET_END: user set offset = OFFSET_END, no committed msg
+ // OFFSET_BEGINNING: user set offset = OFFSET_BEGINNING, no committed msg
+ // other: current committed msg's offset
+ private void getReadableProgress(Map<Integer, String>
showPartitionIdToOffset) {
for (Map.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) {
- showPartitionIdToOffset.put(entry.getKey(), entry.getValue() - 1);
+ if (entry.getValue() == 0) {
+ showPartitionIdToOffset.put(entry.getKey(), OFFSET_ZERO);
+ } else if (entry.getValue() == -1) {
+ showPartitionIdToOffset.put(entry.getKey(), OFFSET_END);
Review comment:
What does OFFSET_END mean?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]