abhijeetk88 commented on code in PR #14127: URL: https://github.com/apache/kafka/pull/14127#discussion_r1285196135
########## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java: ########## @@ -64,302 +65,387 @@ class ConsumerTask implements Runnable, Closeable { private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class); - private static final long POLL_INTERVAL_MS = 100L; + static long pollIntervalMs = 100L; private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); - private final KafkaConsumer<byte[], byte[]> consumer; - private final String metadataTopicName; + private final Consumer<byte[], byte[]> consumer; private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler; private final RemoteLogMetadataTopicPartitioner topicPartitioner; - private final Time time; + private final Time time = new SystemTime(); + // TODO - Update comments below // It indicates whether the closing process has been started or not. If it is set as true, // consumer will stop consuming messages, and it will not allow partition assignments to be updated. - private volatile boolean closing = false; - + private volatile boolean isClosed = false; // It indicates whether the consumer needs to assign the partitions or not. This is set when it is // determined that the consumer needs to be assigned with the updated partitions. - private volatile boolean assignPartitions = false; + private volatile boolean isAssignmentChanged = true; Review Comment: This is required for the first time the ConsumerThread starts running. If the default value is set to false, the ConsumerTask will be busy executing the while loop (Line 116) without actually doing anything, until some assignment is received. Setting the default value to true prevents it from executing the while loop unnecessarily and makes it wait for an assignment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org