rzo1 commented on code in PR #8589:
URL: https://github.com/apache/storm/pull/8589#discussion_r3201180031


##########
external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java:
##########
@@ -172,12 +172,13 @@ public static List<KafkaOffsetLagResult> 
getOffsetLags(NewKafkaSpoutOffsetQuery
                 }
             }
             consumer.assign(topicPartitionList);
+            Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
consumer.committed(new HashSet<>(topicPartitionList));
+            consumer.seekToEnd(topicPartitionList);
             for (TopicPartition topicPartition : topicPartitionList) {
-                Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata = 
consumer.committed(Collections.singleton(topicPartition));
-                long committedOffset = offsetAndMetadata != null ? 
offsetAndMetadata.get(topicPartition).offset() : -1;
-                consumer.seekToEnd(toArrayList(topicPartition));
+                OffsetAndMetadata partitionOffset = 
committedOffsets.get(topicPartition);
+                long committedOffset = partitionOffset != null ? 
partitionOffset.offset() : -1;
                 result.add(new KafkaOffsetLagResult(topicPartition.topic(), 
topicPartition.partition(), committedOffset,
-                                                    
consumer.position(topicPartition)));
+                        consumer.position(topicPartition)));

Review Comment:
   The fix here is good — bulk `committed(Set)` + null-check is exactly right, 
and moving `seekToEnd(topicPartitionList)` out of the per-partition loop is a 
nice cleanup.
   
   My only ask is to **separate the bug fix from the formatting churn** in this 
file. Outside this loop, most of the diff is reformatting (continuation indent 
4 → 8, Javadoc whitespace). That makes a clean cherry-pick of the fix to 
maintenance branches harder, and obscures what's actually changing for someone 
bisecting later. Suggest two commits: one for the fix, one for the formatting — 
or just drop the formatting touches from this PR.



##########
storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java:
##########
@@ -171,10 +171,15 @@ private static Map<String, Object> 
getLagResultForKafka(String spoutId, SpoutSpe
                     String resultFromMonitor = new 
ShellCommandRunnerImpl().execCommand(commands.toArray(new String[0]));
 
                     try {
-                        result = (Map<String, Object>) 
JSONValue.parseWithException(resultFromMonitor);
+                        Object parsed = 
JSONValue.parseWithException(resultFromMonitor);
+                        if (parsed instanceof Map) {
+                            result = (Map<String, Object>) parsed;
+                        } else {
+                            // json-smart parses unquoted plain text leniently 
as a String, so we can land here
+                            // when the monitor printed an error message 
instead of JSON; surface it as the error.
+                            errorMsg = resultFromMonitor;
+                        }
                     } catch (ParseException e) {
-                        LOGGER.debug("JSON parsing failed, assuming message as 
error message: {}", resultFromMonitor);
-                        // json parsing fail -> error received
                         errorMsg = resultFromMonitor;

Review Comment:
   Tiny nit: the previous code had a `LOGGER.debug("JSON parsing failed, 
assuming message as error message: {}", resultFromMonitor)` here. Now that the 
new `instanceof Map` path silently falls through to `errorMsg` *and* the 
`ParseException` path silently does too, there's no log line that captures "the 
monitor said something we couldn't parse."
   
   The error does propagate via `errorMsg` so users see it, but losing the 
debug log makes future debugging harder. Suggest keeping a single debug-level 
log line in both branches, e.g. `LOGGER.debug("Monitor returned non-JSON 
output, treating as error: {}", resultFromMonitor);` — same observability you 
had before, no extra cost.



##########
external/storm-kafka-monitor/src/test/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtilTest.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.kafka.monitor;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.kafka.KafkaContainer;
+import org.testcontainers.utility.DockerImageName;
+
+@Testcontainers

Review Comment:
   Should be `@Testcontainers(disabledWithoutDocker = true)`.
   
   With bare `@Testcontainers`, this test hard-fails on any environment without 
a Docker daemon (CI runners, sandboxed builds, contributor laptops without 
Docker Desktop). With `disabledWithoutDocker = true` the JUnit engine *skips* 
the test instead of failing it.
   
   The Storm codebase already has precedent for this — 
`storm-metrics-prometheus`:
   
   ```java
   // 
external/storm-metrics-prometheus/src/test/java/.../PrometheusPreparableReporterTest.java:43
   @Testcontainers(disabledWithoutDocker = true)
   ```
   
   (Note: `storm-redis/.../RedisFilterBoltTest.java:58` uses bare 
`@Testcontainers` — that's an older pattern that probably should also be 
updated, but out of scope for this PR.)
   
   Also worth noting: a Kafka container cold-start is ~10–20s. Worth flagging 
in the test class's javadoc that this is an integration test gated on Docker 
availability — so a future contributor running `mvn test` on the module without 
Docker understands why it's skipped.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to