yashmayya commented on code in PR #13818:
URL: https://github.com/apache/kafka/pull/13818#discussion_r1219524567
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName,
Connector connector, Map<String,
Admin admin = adminFactory.apply(adminConfig);
try {
- List<KafkaFuture<Void>> adminFutures = new ArrayList<>();
-
- Map<TopicPartition, OffsetAndMetadata> offsetsToAlter =
parsedOffsets.entrySet()
- .stream()
- .filter(entry -> entry.getValue() != null)
- .collect(Collectors.toMap(Map.Entry::getKey, e ->
new OffsetAndMetadata(e.getValue())));
-
- if (!offsetsToAlter.isEmpty()) {
- log.debug("Committing the following consumer group
offsets using an admin client for sink connector {}: {}.",
- connName, offsetsToAlter);
- AlterConsumerGroupOffsetsOptions
alterConsumerGroupOffsetsOptions = new
AlterConsumerGroupOffsetsOptions().timeoutMs(
+ Map<TopicPartition, Long> offsetsToWrite;
+ if (isReset) {
+ offsetsToWrite = new HashMap<>();
+ ListConsumerGroupOffsetsOptions
listConsumerGroupOffsetsOptions = new
ListConsumerGroupOffsetsOptions().timeoutMs(
(int)
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
- AlterConsumerGroupOffsetsResult
alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId,
offsetsToAlter,
- alterConsumerGroupOffsetsOptions);
-
-
adminFutures.add(alterConsumerGroupOffsetsResult.all());
+ try {
+ admin.listConsumerGroupOffsets(groupId,
listConsumerGroupOffsetsOptions)
+ .partitionsToOffsetAndMetadata()
+
.get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+ .forEach((topicPartition,
offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null));
+
+ log.debug("Found the following topic partitions
(to reset offsets) for sink connector {} and consumer group ID {}: {}",
+ connName, groupId,
offsetsToWrite.keySet());
+ } catch (Exception e) {
+ Utils.closeQuietly(admin, "Offset reset admin for
sink connector " + connName);
+ log.error("Failed to list offsets prior to
resetting sink connector offsets", e);
+ cb.onCompletion(new ConnectException("Failed to
list offsets prior to resetting sink connector offsets", e), null);
+ return;
+ }
+ } else {
+ offsetsToWrite =
SinkUtils.parseSinkConnectorOffsets(offsets);
}
- Set<TopicPartition> partitionsToReset =
parsedOffsets.entrySet()
- .stream()
- .filter(entry -> entry.getValue() == null)
- .map(Map.Entry::getKey)
- .collect(Collectors.toSet());
-
- if (!partitionsToReset.isEmpty()) {
- log.debug("Deleting the consumer group offsets for the
following topic partitions using an admin client for sink connector {}: {}.",
- connName, partitionsToReset);
- DeleteConsumerGroupOffsetsOptions
deleteConsumerGroupOffsetsOptions = new
DeleteConsumerGroupOffsetsOptions().timeoutMs(
- (int)
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
- DeleteConsumerGroupOffsetsResult
deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId,
partitionsToReset,
- deleteConsumerGroupOffsetsOptions);
+ boolean alterOffsetsResult;
+ try {
+ alterOffsetsResult = ((SinkConnector)
connector).alterOffsets(connectorConfig, offsetsToWrite);
+ } catch (UnsupportedOperationException e) {
+ throw new ConnectException("Failed to modify offsets
for connector " + connName + " because it doesn't support external " +
+ "modification of offsets", e);
+ }
-
adminFutures.add(deleteConsumerGroupOffsetsResult.all());
+ // This should only occur for an offset reset request when:
+ // 1. There was a prior attempt to reset offsets
+ // OR
+ // 2. No offsets have been committed yet
+ if (offsetsToWrite.isEmpty()) {
Review Comment:
I'm wondering whether we should go ahead and attempt to delete the consumer
group even in this case, in order to avoid inconsistency (if no offsets have
been committed for the group yet, the current implementation won't delete the
consumer group). The only minor downside is that we'd need special case
handling for `GroupIdNotFoundException`s arising from calls to
`Admin::deleteConsumerGroups` (interestingly `Admin::listConsumerGroupOffsets`
doesn't result in an exception for non-existent groups, but an empty partition
offset map).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]