gemini-code-assist[bot] commented on code in PR #38613:
URL: https://github.com/apache/beam/pull/38613#discussion_r3292334166
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java:
##########
@@ -76,40 +86,55 @@ static class CommitOffsetDoFn extends
DoFn<KV<KafkaSourceDescriptor, Long>, Void
@RequiresStableInput
@ProcessElement
public void processElement(@Element KV<KafkaSourceDescriptor, Long>
element) {
+
Map<String, Object> updatedConsumerConfig =
overrideBootstrapServersConfig(consumerConfig, element.getKey());
+
try (Consumer<byte[], byte[]> consumer =
consumerFactoryFn.apply(updatedConsumerConfig)) {
+
try {
consumer.commitSync(
Collections.singletonMap(
element.getKey().getTopicPartition(),
new OffsetAndMetadata(element.getValue() + 1)));
+
} catch (Exception e) {
- // TODO: consider retrying.
+
+ commitFailures.inc();
+ retriesExhausted.inc();
+
+ // TODO: consider retrying and increment retry-attempt metrics.
LOG.warn("Getting exception when committing offset: {}",
e.getMessage());
}
}
}
private Map<String, Object> overrideBootstrapServersConfig(
Map<String, Object> currentConfig, KafkaSourceDescriptor description) {
+
checkState(
currentConfig.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)
|| description.getBootStrapServers() != null);
+
Map<String, Object> config = new HashMap<>(currentConfig);
+
if (description.getBootStrapServers() != null
&& !description.getBootStrapServers().isEmpty()) {
+
config.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
String.join(",", description.getBootStrapServers()));
}
+
Review Comment:

Removing unnecessary blank lines to maintain consistency with the project's
coding style.
```java
checkState(
currentConfig.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)
|| description.getBootStrapServers() != null);
Map<String, Object> config = new HashMap<>(currentConfig);
if (description.getBootStrapServers() != null
&& !description.getBootStrapServers().isEmpty()) {
config.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
String.join(",", description.getBootStrapServers()));
}
```
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java:
##########
@@ -62,7 +64,15 @@ public class KafkaCommitOffset<K, V>
static class CommitOffsetDoFn extends DoFn<KV<KafkaSourceDescriptor, Long>,
Void> {
private static final Logger LOG =
LoggerFactory.getLogger(CommitOffsetDoFn.class);
+
+ private final Counter commitFailures =
+ Metrics.counter(CommitOffsetDoFn.class, "commit-failures");
+
+ private final Counter retriesExhausted =
+ Metrics.counter(CommitOffsetDoFn.class, "retries-exhausted");
+
Review Comment:

The `retriesExhausted` metric is currently redundant with `commitFailures`
because no retry logic is implemented yet. It is better to introduce this
metric when retries are actually added. Additionally, there is an excessive
amount of vertical whitespace being introduced throughout the file which
deviates from the existing code style.
```suggestion
private final Counter commitFailures =
Metrics.counter(CommitOffsetDoFn.class, "commit-failures");
```
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java:
##########
@@ -76,40 +86,55 @@ static class CommitOffsetDoFn extends
DoFn<KV<KafkaSourceDescriptor, Long>, Void
@RequiresStableInput
@ProcessElement
public void processElement(@Element KV<KafkaSourceDescriptor, Long>
element) {
+
Map<String, Object> updatedConsumerConfig =
overrideBootstrapServersConfig(consumerConfig, element.getKey());
+
try (Consumer<byte[], byte[]> consumer =
consumerFactoryFn.apply(updatedConsumerConfig)) {
+
try {
consumer.commitSync(
Collections.singletonMap(
element.getKey().getTopicPartition(),
new OffsetAndMetadata(element.getValue() + 1)));
+
} catch (Exception e) {
- // TODO: consider retrying.
+
+ commitFailures.inc();
+ retriesExhausted.inc();
+
+ // TODO: consider retrying and increment retry-attempt metrics.
LOG.warn("Getting exception when committing offset: {}",
e.getMessage());
}
}
Review Comment:

Removing the redundant metric increment and cleaning up the excessive
vertical whitespace to improve code density and readability.
```java
Map<String, Object> updatedConsumerConfig =
overrideBootstrapServersConfig(consumerConfig, element.getKey());
try (Consumer<byte[], byte[]> consumer =
consumerFactoryFn.apply(updatedConsumerConfig)) {
try {
consumer.commitSync(
Collections.singletonMap(
element.getKey().getTopicPartition(),
new OffsetAndMetadata(element.getValue() + 1)));
} catch (Exception e) {
commitFailures.inc();
// TODO: consider retrying and increment retry-attempt metrics.
LOG.warn("Getting exception when committing offset: {}",
e.getMessage());
}
}
```
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java:
##########
@@ -158,26 +187,34 @@ public void processElement(
@FinishBundle
@SuppressWarnings("nullness") // startBundle guaranteed to initialize
public void finishBundle(FinishBundleContext context) {
+
maxObserved.forEach(
(k, v) -> context.output(KV.of(k, v.offset), v.timestamp,
GlobalWindow.INSTANCE));
}
}
@Override
public PCollection<Void> expand(PCollection<KV<KafkaSourceDescriptor,
KafkaRecord<K, V>>> input) {
+
try {
+
PCollection<KV<KafkaSourceDescriptor, Long>> offsets;
+
if (use259implementation) {
+
offsets =
input.apply(
MapElements.into(new TypeDescriptor<KV<KafkaSourceDescriptor,
Long>>() {})
.via(element -> KV.of(element.getKey(),
element.getValue().getOffset())));
+
} else {
+
// Reduce the amount of data to combine by calculating a max within
the generally dense
// bundles of reading
// from a Kafka partition.
offsets = input.apply(ParDo.of(new MaxOffsetFn<>()));
}
+
Review Comment:

Cleaning up excessive whitespace in the `expand` method to improve
readability.
```java
try {
PCollection<KV<KafkaSourceDescriptor, Long>> offsets;
if (use259implementation) {
offsets =
input.apply(
MapElements.into(new
TypeDescriptor<KV<KafkaSourceDescriptor, Long>>() {})
.via(element -> KV.of(element.getKey(),
element.getValue().getOffset())));
} else {
// Reduce the amount of data to combine by calculating a max within
the generally dense
// bundles of reading
// from a Kafka partition.
offsets = input.apply(ParDo.of(new MaxOffsetFn<>()));
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]