showuon commented on code in PR #14127: URL: https://github.com/apache/kafka/pull/14127#discussion_r1299492523
########## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java: ########## @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.LeaderNotAvailableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition; +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class ConsumerTaskTest { + + private final int numMetadataTopicPartitions = 5; + private final RemoteLogMetadataTopicPartitioner partitioner = new RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions); + private final DummyEventHandler handler = new DummyEventHandler(); + private final Set<TopicPartition> remoteLogPartitions = IntStream.range(0, numMetadataTopicPartitions).boxed() + .map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet()); + private final Uuid topicId = Uuid.randomUuid(); + private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); + + private ConsumerTask consumerTask; + private MockConsumer<byte[], byte[]> consumer; + private Thread thread; + + @BeforeEach + public void beforeEach() { + final Map<TopicPartition, Long> offsets = remoteLogPartitions.stream() + .collect(Collectors.toMap(Function.identity(), e -> 0L)); + consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + consumer.updateBeginningOffsets(offsets); + consumerTask = new ConsumerTask(handler, partitioner, consumer, 10L, 300_000L, new SystemTime()); + thread = new Thread(consumerTask); + } + + @AfterEach + public void afterEach() throws InterruptedException { + if (thread != null) { + consumerTask.close(); + thread.join(10_000); + } + } + + /** + * Tests that the consumer task shuts down gracefully when there were no assignments. + */ + @Test + public void testCloseOnNoAssignment() throws InterruptedException { + thread.start(); + Thread.sleep(10); + } + + @Test + public void testIdempotentClose() { + thread.start(); + consumerTask.close(); + consumerTask.close(); + } + + @Test + public void testUserTopicIdPartitionEquals() { + final TopicIdPartition tpId = new TopicIdPartition(topicId, new TopicPartition("sample", 0)); + final UserTopicIdPartition utp1 = new UserTopicIdPartition(tpId, partitioner.metadataPartition(tpId)); + final UserTopicIdPartition utp2 = new UserTopicIdPartition(tpId, partitioner.metadataPartition(tpId)); + utp1.isInitialized = true; + utp1.isAssigned = true; + + assertFalse(utp2.isInitialized); + assertFalse(utp2.isAssigned); + assertEquals(utp1, utp2); + } + + @Test + public void testAddAssignmentsForPartitions() throws InterruptedException { + final List<TopicIdPartition> idPartitions = getIdPartitions("sample", 3); + final Map<TopicPartition, Long> endOffsets = idPartitions.stream() + .map(idp -> toRemoteLogPartition(partitioner.metadataPartition(idp))) + .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> b)); + consumer.updateEndOffsets(endOffsets); + consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions)); + thread.start(); + for (final TopicIdPartition idPartition : idPartitions) { + TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(idPartition), "Timed out waiting for " + idPartition + " to be assigned"); + assertTrue(consumerTask.isMetadataPartitionAssigned(partitioner.metadataPartition(idPartition))); + assertTrue(handler.isPartitionLoaded.get(idPartition)); + } + } + + @Test + public void testRemoveAssignmentsForPartitions() throws InterruptedException { + final List<TopicIdPartition> allPartitions = getIdPartitions("sample", 3); + final Map<TopicPartition, Long> endOffsets = allPartitions.stream() + .map(idp -> toRemoteLogPartition(partitioner.metadataPartition(idp))) + .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> b)); + consumer.updateEndOffsets(endOffsets); + consumerTask.addAssignmentsForPartitions(new HashSet<>(allPartitions)); + thread.start(); + + final TopicIdPartition tpId = allPartitions.get(0); + TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId), "Timed out waiting for " + tpId + " to be assigned"); + addRecord(consumer, partitioner.metadataPartition(tpId), tpId, 0); + TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(partitioner.metadataPartition(tpId)).isPresent(), + "Couldn't read record"); + + final Set<TopicIdPartition> removePartitions = Collections.singleton(tpId); + consumerTask.removeAssignmentsForPartitions(removePartitions); + for (final TopicIdPartition idPartition : allPartitions) { + final TestCondition condition = () -> removePartitions.contains(idPartition) == !consumerTask.isUserPartitionAssigned(idPartition); + TestUtils.waitForCondition(condition, "Timed out waiting for " + idPartition + " to be removed"); + } + for (TopicIdPartition removePartition : removePartitions) { + TestUtils.waitForCondition(() -> handler.isPartitionCleared.containsKey(removePartition), + "Timed out waiting for " + removePartition + " to be cleared"); + } + } + + @Test + public void testConcurrentPartitionAssignments() throws InterruptedException, ExecutionException { + final List<TopicIdPartition> allPartitions = getIdPartitions("sample", 100); + final Map<TopicPartition, Long> endOffsets = allPartitions.stream() + .map(idp -> toRemoteLogPartition(partitioner.metadataPartition(idp))) + .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> b)); + consumer.updateEndOffsets(endOffsets); + + final AtomicBoolean isAllPartitionsAssigned = new AtomicBoolean(false); + CountDownLatch latch = new CountDownLatch(1); + Thread assignor = new Thread(() -> { + int partitionsAssigned = 0; + for (TopicIdPartition partition : allPartitions) { + if (partitionsAssigned == 50) { + // Once half the topic partitions are assigned, wait for the consumer to catch up. This ensures + // that the consumer is already running when the rest of the partitions are assigned. + try { + latch.await(1, TimeUnit.MINUTES); + } catch (InterruptedException e) { + fail(e.getMessage()); + } + } + consumerTask.addAssignmentsForPartitions(Collections.singleton(partition)); + partitionsAssigned++; + } + isAllPartitionsAssigned.set(true); + }); + Runnable consumerRunnable = () -> { + try { + while (!isAllPartitionsAssigned.get()) { + consumerTask.maybeWaitForPartitionAssignments(); + latch.countDown(); + } + } catch (Exception e) { + fail(e.getMessage()); + } + }; + + ExecutorService consumerExecutor = Executors.newSingleThreadExecutor(); + Future<?> future = consumerExecutor.submit(consumerRunnable); + assignor.start(); + + assignor.join(); + future.get(); + } + + @Test + public void testCanProcessRecord() throws InterruptedException { + final Uuid topicId = Uuid.fromString("Bp9TDduJRGa9Q5rlvCJOxg"); + final TopicIdPartition tpId0 = new TopicIdPartition(topicId, new TopicPartition("sample", 0)); + final TopicIdPartition tpId1 = new TopicIdPartition(topicId, new TopicPartition("sample", 1)); + final TopicIdPartition tpId2 = new TopicIdPartition(topicId, new TopicPartition("sample", 2)); + assertEquals(partitioner.metadataPartition(tpId0), partitioner.metadataPartition(tpId1)); + assertEquals(partitioner.metadataPartition(tpId0), partitioner.metadataPartition(tpId2)); + + final int metadataPartition = partitioner.metadataPartition(tpId0); + consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), 0L)); + final Set<TopicIdPartition> assignments = Collections.singleton(tpId0); + consumerTask.addAssignmentsForPartitions(assignments); + thread.start(); + TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId0), "Timed out waiting for " + tpId0 + " to be assigned"); + + addRecord(consumer, metadataPartition, tpId0, 0); + addRecord(consumer, metadataPartition, tpId0, 1); + TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)), "Couldn't read record"); + assertEquals(2, handler.metadataCounter); + + // should only read the tpId1 records + consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId1)); + TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId1), "Timed out waiting for " + tpId1 + " to be assigned"); + addRecord(consumer, metadataPartition, tpId1, 2); + TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(2L)), "Couldn't read record"); + assertEquals(3, handler.metadataCounter); + + // shouldn't read tpId2 records because it's not assigned + addRecord(consumer, metadataPartition, tpId2, 3); + TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(3L)), "Couldn't read record"); + assertEquals(3, handler.metadataCounter); + } + + @Test + public void testMaybeMarkUserPartitionsAsReady() throws InterruptedException { + final TopicIdPartition tpId = getIdPartitions("hello", 1).get(0); + final int metadataPartition = partitioner.metadataPartition(tpId); + consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), 2L)); + consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId)); + thread.start(); + + TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be assigned"); + assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition)); + assertFalse(handler.isPartitionInitialized.containsKey(tpId)); Review Comment: Does this mean, when the partition contains no records, the partition is considered as uninitialized at this state? Does that make sense? ########## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java: ########## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.LeaderNotAvailableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition; +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class ConsumerTaskTest { + + private final int numMetadataTopicPartitions = 5; + private final RemoteLogMetadataTopicPartitioner partitioner = new RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions); + private final DummyEventHandler handler = new DummyEventHandler(); + private final Set<TopicPartition> remoteLogPartitions = IntStream.range(0, numMetadataTopicPartitions).boxed() + .map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet()); + private final Uuid topicId = Uuid.randomUuid(); + private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); + + private ConsumerTask consumerTask; + private MockConsumer<byte[], byte[]> consumer; + private Thread thread; + + @BeforeEach + public void beforeEach() { + final Map<TopicPartition, Long> offsets = remoteLogPartitions.stream() + .collect(Collectors.toMap(Function.identity(), e -> 0L)); + consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + consumer.updateBeginningOffsets(offsets); + ConsumerTask.pollIntervalMs = 10L; + consumerTask = new ConsumerTask(handler, partitioner, ignored -> consumer); + thread = new Thread(consumerTask); + } + + @AfterEach + public void afterEach() throws InterruptedException { + if (thread != null) { + consumerTask.close(); + thread.join(); + } + } + + @Test + public void testCloseOnNoAssignment() throws InterruptedException { + thread.start(); + Thread.sleep(10); Review Comment: Could we add some assertion to verify `consumer task shuts down gracefully` as you expected? In current test, even if the consumer task doesn't shutdown, it sill passes the test, right? ########## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java: ########## @@ -64,302 +63,403 @@ class ConsumerTask implements Runnable, Closeable { private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class); - private static final long POLL_INTERVAL_MS = 100L; - private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); - private final KafkaConsumer<byte[], byte[]> consumer; - private final String metadataTopicName; + private final Consumer<byte[], byte[]> consumer; private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler; private final RemoteLogMetadataTopicPartitioner topicPartitioner; + // The timeout for the consumer to poll records from the remote log metadata topic. + private final long pollTimeoutMs; private final Time time; - // It indicates whether the closing process has been started or not. If it is set as true, - // consumer will stop consuming messages, and it will not allow partition assignments to be updated. - private volatile boolean closing = false; - - // It indicates whether the consumer needs to assign the partitions or not. This is set when it is - // determined that the consumer needs to be assigned with the updated partitions. - private volatile boolean assignPartitions = false; + // It indicates whether the ConsumerTask is closed or not. + private volatile boolean isClosed = false; + // It indicates whether the user topic partition assignment to the consumer has changed or not. If the assignment + // has changed, the consumer will eventually start tracking the newly assigned partitions and stop tracking the + // ones it is no longer assigned to. + // The initial value is set to true to wait for partition assignment on the first execution; otherwise thread will + // be busy without actually doing anything + private volatile boolean hasAssignmentChanged = true; // It represents a lock for any operations related to the assignedTopicPartitions. private final Object assignPartitionsLock = new Object(); // Remote log metadata topic partitions that consumer is assigned to. - private volatile Set<Integer> assignedMetaPartitions = Collections.emptySet(); + private volatile Set<Integer> assignedMetadataPartitions = Collections.emptySet(); // User topic partitions that this broker is a leader/follower for. - private Set<TopicIdPartition> assignedTopicPartitions = Collections.emptySet(); + private volatile Map<TopicIdPartition, UserTopicIdPartition> assignedUserTopicIdPartitions = Collections.emptyMap(); + private volatile Set<TopicIdPartition> processedAssignmentOfUserTopicIdPartitions = Collections.emptySet(); - // Map of remote log metadata topic partition to consumed offsets. Received consumer records - // may or may not have been processed based on the assigned topic partitions. - private final Map<Integer, Long> partitionToConsumedOffsets = new ConcurrentHashMap<>(); + private long uninitializedAt; + private boolean isAllUserTopicPartitionsInitialized; - // Map of remote log metadata topic partition to processed offsets that were synced in committedOffsetsFile. - private Map<Integer, Long> lastSyncedPartitionToConsumedOffsets = Collections.emptyMap(); + // Map of remote log metadata topic partition to consumed offsets. + private final Map<Integer, Long> readOffsetsByMetadataPartition = new ConcurrentHashMap<>(); + private final Map<TopicIdPartition, Long> readOffsetsByUserTopicPartition = new HashMap<>(); - private final long committedOffsetSyncIntervalMs; - private CommittedOffsetsFile committedOffsetsFile; - private long lastSyncedTimeMs; + private Map<TopicPartition, StartAndEndOffsetHolder> offsetHolderByMetadataPartition = new HashMap<>(); + private boolean hasLastOffsetsFetchFailed = false; + private long lastFailedFetchOffsetsTimestamp; + // The interval between retries to fetch the start and end offsets for the metadata partitions after a failed fetch. + private final long offsetFetchRetryIntervalMs; - public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer, - String metadataTopicName, - RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler, + public ConsumerTask(RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler, RemoteLogMetadataTopicPartitioner topicPartitioner, - Path committedOffsetsPath, - Time time, - long committedOffsetSyncIntervalMs) { - this.consumer = Objects.requireNonNull(consumer); - this.metadataTopicName = Objects.requireNonNull(metadataTopicName); + Consumer<byte[], byte[]> consumer, + long pollTimeoutMs, + long offsetFetchRetryIntervalMs, + Time time) { + this.consumer = consumer; this.remotePartitionMetadataEventHandler = Objects.requireNonNull(remotePartitionMetadataEventHandler); this.topicPartitioner = Objects.requireNonNull(topicPartitioner); + this.pollTimeoutMs = pollTimeoutMs; + this.offsetFetchRetryIntervalMs = offsetFetchRetryIntervalMs; this.time = Objects.requireNonNull(time); - this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs; - - initializeConsumerAssignment(committedOffsetsPath); - } - - private void initializeConsumerAssignment(Path committedOffsetsPath) { - try { - committedOffsetsFile = new CommittedOffsetsFile(committedOffsetsPath.toFile()); - } catch (IOException e) { - throw new KafkaException(e); - } - - Map<Integer, Long> committedOffsets = Collections.emptyMap(); - try { - // Load committed offset and assign them in the consumer. - committedOffsets = committedOffsetsFile.readEntries(); - } catch (IOException e) { - // Ignore the error and consumer consumes from the earliest offset. - log.error("Encountered error while building committed offsets from the file. " + - "Consumer will consume from the earliest offset for the assigned partitions.", e); - } - - if (!committedOffsets.isEmpty()) { - // Assign topic partitions from the earlier committed offsets file. - Set<Integer> earlierAssignedPartitions = committedOffsets.keySet(); - assignedMetaPartitions = Collections.unmodifiableSet(earlierAssignedPartitions); - Set<TopicPartition> metadataTopicPartitions = earlierAssignedPartitions.stream() - .map(x -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, x)) - .collect(Collectors.toSet()); - consumer.assign(metadataTopicPartitions); - - // Seek to the committed offsets - for (Map.Entry<Integer, Long> entry : committedOffsets.entrySet()) { - log.debug("Updating consumed offset: [{}] for partition [{}]", entry.getValue(), entry.getKey()); - partitionToConsumedOffsets.put(entry.getKey(), entry.getValue()); - consumer.seek(new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, entry.getKey()), entry.getValue()); - } - - lastSyncedPartitionToConsumedOffsets = Collections.unmodifiableMap(committedOffsets); - } + this.uninitializedAt = time.milliseconds(); } @Override public void run() { - log.info("Started Consumer task thread."); - lastSyncedTimeMs = time.milliseconds(); - try { - while (!closing) { - maybeWaitForPartitionsAssignment(); + log.info("Starting consumer task thread."); + while (!isClosed) { + try { + if (hasAssignmentChanged) { + maybeWaitForPartitionAssignments(); + } log.trace("Polling consumer to receive remote log metadata topic records"); - ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(POLL_INTERVAL_MS)); + final ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(pollTimeoutMs)); for (ConsumerRecord<byte[], byte[]> record : consumerRecords) { processConsumerRecord(record); } - - maybeSyncCommittedDataAndOffsets(false); + maybeMarkUserPartitionsAsReady(); + } catch (final WakeupException ex) { + // ignore logging the error + isClosed = true; + } catch (final RetriableException ex) { + log.warn("Retriable error occurred while processing the records. Retrying...", ex); + } catch (final Exception ex) { + isClosed = true; + log.error("Error occurred while processing the records", ex); } - } catch (Exception e) { - log.error("Error occurred in consumer task, close:[{}]", closing, e); - } finally { - maybeSyncCommittedDataAndOffsets(true); - closeConsumer(); - log.info("Exiting from consumer task thread"); } + try { + consumer.close(Duration.ofSeconds(30)); + } catch (final Exception e) { + log.error("Error encountered while closing the consumer", e); + } + log.info("Exited from consumer task thread"); } private void processConsumerRecord(ConsumerRecord<byte[], byte[]> record) { - // Taking assignPartitionsLock here as updateAssignmentsForPartitions changes assignedTopicPartitions - // and also calls remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition) for the removed - // partitions. - RemoteLogMetadata remoteLogMetadata = serde.deserialize(record.value()); - synchronized (assignPartitionsLock) { - if (assignedTopicPartitions.contains(remoteLogMetadata.topicIdPartition())) { - remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata); - } else { - log.debug("This event {} is skipped as the topic partition is not assigned for this instance.", remoteLogMetadata); - } - log.debug("Updating consumed offset: [{}] for partition [{}]", record.offset(), record.partition()); - partitionToConsumedOffsets.put(record.partition(), record.offset()); + final RemoteLogMetadata remoteLogMetadata = serde.deserialize(record.value()); + if (shouldProcess(remoteLogMetadata, record.offset())) { + remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata); + readOffsetsByUserTopicPartition.put(remoteLogMetadata.topicIdPartition(), record.offset()); + } else { + log.debug("The event {} is skipped because it is either already processed or not assigned to this consumer", remoteLogMetadata); } + log.debug("Updating consumed offset: [{}] for partition [{}]", record.offset(), record.partition()); + readOffsetsByMetadataPartition.put(record.partition(), record.offset()); } - private void maybeSyncCommittedDataAndOffsets(boolean forceSync) { - // Return immediately if there is no consumption from last time. - boolean noConsumedOffsetUpdates = partitionToConsumedOffsets.equals(lastSyncedPartitionToConsumedOffsets); - if (noConsumedOffsetUpdates || !forceSync && time.milliseconds() - lastSyncedTimeMs < committedOffsetSyncIntervalMs) { - log.debug("Skip syncing committed offsets, noConsumedOffsetUpdates: {}, forceSync: {}", noConsumedOffsetUpdates, forceSync); + private boolean shouldProcess(final RemoteLogMetadata metadata, final long recordOffset) { + final TopicIdPartition tpId = metadata.topicIdPartition(); + final Long readOffset = readOffsetsByUserTopicPartition.get(tpId); + return processedAssignmentOfUserTopicIdPartitions.contains(tpId) && (readOffset == null || readOffset < recordOffset); + } + + private void maybeMarkUserPartitionsAsReady() { + if (isAllUserTopicPartitionsInitialized) { return; } - - try { - // Need to take lock on assignPartitionsLock as assignedTopicPartitions might - // get updated by other threads. - synchronized (assignPartitionsLock) { - for (TopicIdPartition topicIdPartition : assignedTopicPartitions) { - int metadataPartition = topicPartitioner.metadataPartition(topicIdPartition); - Long offset = partitionToConsumedOffsets.get(metadataPartition); - if (offset != null) { - remotePartitionMetadataEventHandler.syncLogMetadataSnapshot(topicIdPartition, metadataPartition, offset); + maybeFetchStartAndEndOffsets(); + boolean isAllInitialized = true; + for (final UserTopicIdPartition utp : assignedUserTopicIdPartitions.values()) { + if (utp.isAssigned && !utp.isInitialized) { + final Integer metadataPartition = utp.metadataPartition; + final StartAndEndOffsetHolder holder = offsetHolderByMetadataPartition.get(toRemoteLogPartition(metadataPartition)); + // The offset-holder can be null, when the recent assignment wasn't picked up by the consumer. + if (holder != null) { + final Long readOffset = readOffsetsByMetadataPartition.getOrDefault(metadataPartition, -1L); + // 1) The end-offset was fetched only once during reassignment. The metadata-partition can receive + // new stream of records, so the consumer can read records more than the last-fetched end-offset. + // 2) When the internal topic becomes empty due to breach by size/time/start-offset, then there + // are no records to read. + if (readOffset + 1 >= holder.endOffset || holder.endOffset.equals(holder.startOffset)) { + markInitialized(utp); } else { - log.debug("Skipping sync-up of the remote-log-metadata-file for partition: [{}] , with remote log metadata partition{}, and no offset", - topicIdPartition, metadataPartition); + log.debug("The user-topic-partition {} could not be marked initialized since the read-offset is {} " + + "but the end-offset is {} for the metadata-partition {}", utp, readOffset, holder.endOffset, + metadataPartition); } + } else { + log.debug("The offset-holder is null for the metadata-partition {}. The consumer may not have picked" + + " up the recent assignment", metadataPartition); } - - // Write partitionToConsumedOffsets into committed offsets file as we do not want to process them again - // in case of restarts. - committedOffsetsFile.writeEntries(partitionToConsumedOffsets); - lastSyncedPartitionToConsumedOffsets = new HashMap<>(partitionToConsumedOffsets); } - - lastSyncedTimeMs = time.milliseconds(); - } catch (IOException e) { - throw new KafkaException("Error encountered while writing committed offsets to a local file", e); + isAllInitialized = isAllInitialized && utp.isInitialized; Review Comment: nit: Should we add a `utp.isAssigned` check here? `isAllInitialized = isAllInitialized && utp.isAssigned && utp.isInitialized;` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org