shanthoosh commented on a change in pull request #951: SAMZA-2127: Upgrade to
Kafka 2.0
URL: https://github.com/apache/samza/pull/951#discussion_r268385422
##########
File path:
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
##########
@@ -628,10 +589,12 @@ public void validateStream(StreamSpec streamSpec) throws
StreamValidationExcepti
@Override
public void deleteMessages(Map<SystemStreamPartition, String> offsets) {
if (deleteCommittedMessages) {
- if (adminClientForDelete == null) {
- adminClientForDelete =
kafka.admin.AdminClient.create(createAdminClientProperties());
- }
- KafkaSystemAdminUtilsScala.deleteMessages(adminClientForDelete, offsets);
+ Map<TopicPartition, RecordsToDelete> recordsToDelete = offsets.entrySet()
+ .stream()
+ .collect(Collectors.toMap(entry ->
+ new TopicPartition(entry.getKey().getStream(),
entry.getKey().getPartition().getPartitionId()),
+ entry ->
RecordsToDelete.beforeOffset(Long.parseLong(entry.getValue()) + 1)));
+ adminClient.deleteRecords(recordsToDelete);
Review comment:
1. Please correct me if i'm wrong.
KafkaAdminClient exposes
[CompletableFuture-based](https://kafka.apache.org/20/javadoc/org/apache/kafka/common/KafkaFuture.html#KafkaFuture--)
async api's. So we have to get the result of the delete-records-API-call and
wait for request to complete. Something like
```java
DeleteRecordsResult deletionRecordsResult = adminClient.
deleteRecords(recordsToDelete);
deletionRecordsResult.all().get(timeout, TimeUnit.MILLISECONDS);
```
We do something similar for
[createTopic](https://github.com/apache/samza/blob/cb51c54fb1d1e0bf94e14cf4288f0a18b72a6bee/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java#L480)
in this class. Just invoking `deleteRecords` API wouldn't suffice right?
2. For delete-msg operation, we're using the default request timeout of the
admin-client (which is 120 seconds. [config-ref]
(https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L137),
if delete-request-timeout is not defined ). However, we've use this :
`KafkaSystemAdmin.KAFKA_ADMIN_OPS_TIMEOUT_MS` uniformly for all admin-client
operations in this class. Would be better to stick to the same admin-timeout or
override delete-timeout explicitly here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services