yashmayya commented on code in PR #13818:
URL: https://github.com/apache/kafka/pull/13818#discussion_r1219524567


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, 
Connector connector, Map<String,
                 Admin admin = adminFactory.apply(adminConfig);
 
                 try {
-                    List<KafkaFuture<Void>> adminFutures = new ArrayList<>();
-
-                    Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = 
parsedOffsets.entrySet()
-                            .stream()
-                            .filter(entry -> entry.getValue() != null)
-                            .collect(Collectors.toMap(Map.Entry::getKey, e -> 
new OffsetAndMetadata(e.getValue())));
-
-                    if (!offsetsToAlter.isEmpty()) {
-                        log.debug("Committing the following consumer group 
offsets using an admin client for sink connector {}: {}.",
-                                connName, offsetsToAlter);
-                        AlterConsumerGroupOffsetsOptions 
alterConsumerGroupOffsetsOptions = new 
AlterConsumerGroupOffsetsOptions().timeoutMs(
+                    Map<TopicPartition, Long> offsetsToWrite;
+                    if (isReset) {
+                        offsetsToWrite = new HashMap<>();
+                        ListConsumerGroupOffsetsOptions 
listConsumerGroupOffsetsOptions = new 
ListConsumerGroupOffsetsOptions().timeoutMs(
                                 (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-                        AlterConsumerGroupOffsetsResult 
alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, 
offsetsToAlter,
-                                alterConsumerGroupOffsetsOptions);
-
-                        
adminFutures.add(alterConsumerGroupOffsetsResult.all());
+                        try {
+                            admin.listConsumerGroupOffsets(groupId, 
listConsumerGroupOffsetsOptions)
+                                    .partitionsToOffsetAndMetadata()
+                                    
.get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+                                    .forEach((topicPartition, 
offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null));
+
+                            log.debug("Found the following topic partitions 
(to reset offsets) for sink connector {} and consumer group ID {}: {}",
+                                    connName, groupId, 
offsetsToWrite.keySet());
+                        } catch (Exception e) {
+                            Utils.closeQuietly(admin, "Offset reset admin for 
sink connector " + connName);
+                            log.error("Failed to list offsets prior to 
resetting sink connector offsets", e);
+                            cb.onCompletion(new ConnectException("Failed to 
list offsets prior to resetting sink connector offsets", e), null);
+                            return;
+                        }
+                    } else {
+                        offsetsToWrite = 
SinkUtils.parseSinkConnectorOffsets(offsets);
                     }
 
-                    Set<TopicPartition> partitionsToReset = 
parsedOffsets.entrySet()
-                            .stream()
-                            .filter(entry -> entry.getValue() == null)
-                            .map(Map.Entry::getKey)
-                            .collect(Collectors.toSet());
-
-                    if (!partitionsToReset.isEmpty()) {
-                        log.debug("Deleting the consumer group offsets for the 
following topic partitions using an admin client for sink connector {}: {}.",
-                                connName, partitionsToReset);
-                        DeleteConsumerGroupOffsetsOptions 
deleteConsumerGroupOffsetsOptions = new 
DeleteConsumerGroupOffsetsOptions().timeoutMs(
-                                (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-                        DeleteConsumerGroupOffsetsResult 
deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, 
partitionsToReset,
-                                deleteConsumerGroupOffsetsOptions);
+                    boolean alterOffsetsResult;
+                    try {
+                        alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, offsetsToWrite);
+                    } catch (UnsupportedOperationException e) {
+                        throw new ConnectException("Failed to modify offsets 
for connector " + connName + " because it doesn't support external " +
+                                "modification of offsets", e);
+                    }
 
-                        
adminFutures.add(deleteConsumerGroupOffsetsResult.all());
+                    // This should only occur for an offset reset request when:
+                    // 1. There was a prior attempt to reset offsets
+                    // OR
+                    // 2. No offsets have been committed yet
+                    if (offsetsToWrite.isEmpty()) {

Review Comment:
   I'm wondering whether we should go ahead and attempt to delete the consumer 
group even in this case, in order to avoid inconsistency (if no offsets have 
been committed for the group yet, the current implementation won't delete the 
consumer group). The only minor downside is that we'd need special case 
handling for `GroupIdNotFoundException`s arising from calls to 
`Admin::deleteConsumerGroups` (interestingly `Admin::listConsumerGroupOffsets` 
doesn't result in an exception for non-existent groups, but an empty partition 
offset map).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to