C0urante commented on code in PR #13818: URL: https://github.com/apache/kafka/pull/13818#discussion_r1238782864
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1320,89 +1317,192 @@ void alterSinkConnectorOffsets(String connName, Connector connector, Map<String, Admin admin = adminFactory.apply(adminConfig); try { - List<KafkaFuture<Void>> adminFutures = new ArrayList<>(); - - Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = parsedOffsets.entrySet() - .stream() - .filter(entry -> entry.getValue() != null) - .collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))); - - if (!offsetsToAlter.isEmpty()) { - log.debug("Committing the following consumer group offsets using an admin client for sink connector {}: {}.", - connName, offsetsToAlter); - AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs( - (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); - AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter, - alterConsumerGroupOffsetsOptions); - - adminFutures.add(alterConsumerGroupOffsetsResult.all()); + Map<TopicPartition, Long> offsetsToWrite; + if (isReset) { + offsetsToWrite = new HashMap<>(); + ListConsumerGroupOffsetsOptions listConsumerGroupOffsetsOptions = new ListConsumerGroupOffsetsOptions() + .timeoutMs((int) timer.remainingMs()); + try { + admin.listConsumerGroupOffsets(groupId, listConsumerGroupOffsetsOptions) + .partitionsToOffsetAndMetadata() + .get(timer.remainingMs(), TimeUnit.MILLISECONDS) + .forEach((topicPartition, offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null)); + + timer.update(); + log.debug("Found the following topic partitions (to reset offsets) for sink connector {} and consumer group ID {}: {}", + connName, groupId, offsetsToWrite.keySet()); + } catch (Exception e) { + Utils.closeQuietly(admin, "Offset reset admin for sink connector " + connName); + log.error("Failed to list offsets prior to resetting offsets for sink connector {}", connName, e); + cb.onCompletion(new ConnectException("Failed to list offsets prior to resetting offsets for sink connector " + connName, e), null); + return; + } + } else { + offsetsToWrite = SinkUtils.parseSinkConnectorOffsets(offsets); } - Set<TopicPartition> partitionsToReset = parsedOffsets.entrySet() - .stream() - .filter(entry -> entry.getValue() == null) - .map(Map.Entry::getKey) - .collect(Collectors.toSet()); - - if (!partitionsToReset.isEmpty()) { - log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.", - connName, partitionsToReset); - DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions().timeoutMs( - (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); - DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset, - deleteConsumerGroupOffsetsOptions); - - adminFutures.add(deleteConsumerGroupOffsetsResult.all()); + boolean alterOffsetsResult; + try { + alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, offsetsToWrite); + } catch (UnsupportedOperationException e) { + log.error("Failed to modify offsets for connector {} because it doesn't support external modification of offsets", + connName, e); + throw new ConnectException("Failed to modify offsets for connector " + connName + " because it doesn't support external " + + "modification of offsets", e); } + updateTimerAndCheckExpiry(timer, "Timed out while calling the 'alterOffsets' method for sink connector " + connName); Review Comment: > Maybe we could add a hint that offsets that appear non-existent should be handled gracefully if possible (this also covers the case where an alter offsets request is used to skip over some data for sink connectors)? I like this, but I wish we could be a little more explicit. I think ideally we want connectors to respond with a no-op for resets of partitions that don't appear to exist, and for non-resets (assuming they do external offset tracking at all) either store that partition/offset pair externally, or throw an exception to indicate that the operation cannot be performed. Obviously that's a bit wordy on its own and I agree that we shouldn't get too wordy in the Javadoc... feel free to push the best you can come up with and if necessary we can either workshop it further in this PR or a follow-up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org