gemini-code-assist[bot] commented on code in PR #38613:
URL: https://github.com/apache/beam/pull/38613#discussion_r3292334166


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java:
##########
@@ -76,40 +86,55 @@ static class CommitOffsetDoFn extends 
DoFn<KV<KafkaSourceDescriptor, Long>, Void
     @RequiresStableInput
     @ProcessElement
     public void processElement(@Element KV<KafkaSourceDescriptor, Long> 
element) {
+
       Map<String, Object> updatedConsumerConfig =
           overrideBootstrapServersConfig(consumerConfig, element.getKey());
+
       try (Consumer<byte[], byte[]> consumer = 
consumerFactoryFn.apply(updatedConsumerConfig)) {
+
         try {
           consumer.commitSync(
               Collections.singletonMap(
                   element.getKey().getTopicPartition(),
                   new OffsetAndMetadata(element.getValue() + 1)));
+
         } catch (Exception e) {
-          // TODO: consider retrying.
+
+          commitFailures.inc();
+          retriesExhausted.inc();
+
+          // TODO: consider retrying and increment retry-attempt metrics.
           LOG.warn("Getting exception when committing offset: {}", 
e.getMessage());
         }
       }
     }
 
     private Map<String, Object> overrideBootstrapServersConfig(
         Map<String, Object> currentConfig, KafkaSourceDescriptor description) {
+
       checkState(
           currentConfig.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)
               || description.getBootStrapServers() != null);
+
       Map<String, Object> config = new HashMap<>(currentConfig);
+
       if (description.getBootStrapServers() != null
           && !description.getBootStrapServers().isEmpty()) {
+
         config.put(
             ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
             String.join(",", description.getBootStrapServers()));
       }
+

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Removing unnecessary blank lines to maintain consistency with the project's 
coding style.
   
   ```java
         checkState(
             currentConfig.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)
                 || description.getBootStrapServers() != null);
         Map<String, Object> config = new HashMap<>(currentConfig);
         if (description.getBootStrapServers() != null
             && !description.getBootStrapServers().isEmpty()) {
           config.put(
               ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
               String.join(",", description.getBootStrapServers()));
         }
   
   ```



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java:
##########
@@ -62,7 +64,15 @@ public class KafkaCommitOffset<K, V>
 
   static class CommitOffsetDoFn extends DoFn<KV<KafkaSourceDescriptor, Long>, 
Void> {
     private static final Logger LOG = 
LoggerFactory.getLogger(CommitOffsetDoFn.class);
+
+    private final Counter commitFailures =
+        Metrics.counter(CommitOffsetDoFn.class, "commit-failures");
+
+    private final Counter retriesExhausted =
+        Metrics.counter(CommitOffsetDoFn.class, "retries-exhausted");
+

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The `retriesExhausted` metric is currently redundant with `commitFailures` 
because no retry logic is implemented yet. It is better to introduce this 
metric when retries are actually added. Additionally, there is an excessive 
amount of vertical whitespace being introduced throughout the file which 
deviates from the existing code style.
   
   ```suggestion
       private final Counter commitFailures =
           Metrics.counter(CommitOffsetDoFn.class, "commit-failures");
   
   ```



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java:
##########
@@ -76,40 +86,55 @@ static class CommitOffsetDoFn extends 
DoFn<KV<KafkaSourceDescriptor, Long>, Void
     @RequiresStableInput
     @ProcessElement
     public void processElement(@Element KV<KafkaSourceDescriptor, Long> 
element) {
+
       Map<String, Object> updatedConsumerConfig =
           overrideBootstrapServersConfig(consumerConfig, element.getKey());
+
       try (Consumer<byte[], byte[]> consumer = 
consumerFactoryFn.apply(updatedConsumerConfig)) {
+
         try {
           consumer.commitSync(
               Collections.singletonMap(
                   element.getKey().getTopicPartition(),
                   new OffsetAndMetadata(element.getValue() + 1)));
+
         } catch (Exception e) {
-          // TODO: consider retrying.
+
+          commitFailures.inc();
+          retriesExhausted.inc();
+
+          // TODO: consider retrying and increment retry-attempt metrics.
           LOG.warn("Getting exception when committing offset: {}", 
e.getMessage());
         }
       }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Removing the redundant metric increment and cleaning up the excessive 
vertical whitespace to improve code density and readability.
   
   ```java
         Map<String, Object> updatedConsumerConfig =
             overrideBootstrapServersConfig(consumerConfig, element.getKey());
         try (Consumer<byte[], byte[]> consumer = 
consumerFactoryFn.apply(updatedConsumerConfig)) {
           try {
             consumer.commitSync(
                 Collections.singletonMap(
                     element.getKey().getTopicPartition(),
                     new OffsetAndMetadata(element.getValue() + 1)));
           } catch (Exception e) {
             commitFailures.inc();
             // TODO: consider retrying and increment retry-attempt metrics.
             LOG.warn("Getting exception when committing offset: {}", 
e.getMessage());
           }
         }
   
   ```



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java:
##########
@@ -158,26 +187,34 @@ public void processElement(
     @FinishBundle
     @SuppressWarnings("nullness") // startBundle guaranteed to initialize
     public void finishBundle(FinishBundleContext context) {
+
       maxObserved.forEach(
           (k, v) -> context.output(KV.of(k, v.offset), v.timestamp, 
GlobalWindow.INSTANCE));
     }
   }
 
   @Override
   public PCollection<Void> expand(PCollection<KV<KafkaSourceDescriptor, 
KafkaRecord<K, V>>> input) {
+
     try {
+
       PCollection<KV<KafkaSourceDescriptor, Long>> offsets;
+
       if (use259implementation) {
+
         offsets =
             input.apply(
                 MapElements.into(new TypeDescriptor<KV<KafkaSourceDescriptor, 
Long>>() {})
                     .via(element -> KV.of(element.getKey(), 
element.getValue().getOffset())));
+
       } else {
+
         // Reduce the amount of data to combine by calculating a max within 
the generally dense
         // bundles of reading
         // from a Kafka partition.
         offsets = input.apply(ParDo.of(new MaxOffsetFn<>()));
       }
+

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Cleaning up excessive whitespace in the `expand` method to improve 
readability.
   
   ```java
       try {
         PCollection<KV<KafkaSourceDescriptor, Long>> offsets;
         if (use259implementation) {
           offsets =
               input.apply(
                   MapElements.into(new 
TypeDescriptor<KV<KafkaSourceDescriptor, Long>>() {})
                       .via(element -> KV.of(element.getKey(), 
element.getValue().getOffset())));
         } else {
           // Reduce the amount of data to combine by calculating a max within 
the generally dense
           // bundles of reading
           // from a Kafka partition.
           offsets = input.apply(ParDo.of(new MaxOffsetFn<>()));
         }
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to