rzo1 commented on code in PR #8589:
URL: https://github.com/apache/storm/pull/8589#discussion_r3201180031
##########
external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java:
##########
@@ -172,12 +172,13 @@ public static List<KafkaOffsetLagResult>
getOffsetLags(NewKafkaSpoutOffsetQuery
}
}
consumer.assign(topicPartitionList);
+ Map<TopicPartition, OffsetAndMetadata> committedOffsets =
consumer.committed(new HashSet<>(topicPartitionList));
+ consumer.seekToEnd(topicPartitionList);
for (TopicPartition topicPartition : topicPartitionList) {
- Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata =
consumer.committed(Collections.singleton(topicPartition));
- long committedOffset = offsetAndMetadata != null ?
offsetAndMetadata.get(topicPartition).offset() : -1;
- consumer.seekToEnd(toArrayList(topicPartition));
+ OffsetAndMetadata partitionOffset =
committedOffsets.get(topicPartition);
+ long committedOffset = partitionOffset != null ?
partitionOffset.offset() : -1;
result.add(new KafkaOffsetLagResult(topicPartition.topic(),
topicPartition.partition(), committedOffset,
-
consumer.position(topicPartition)));
+ consumer.position(topicPartition)));
Review Comment:
The fix here is good — bulk `committed(Set)` + null-check is exactly right,
and moving `seekToEnd(topicPartitionList)` out of the per-partition loop is a
nice cleanup.
My only ask is to **separate the bug fix from the formatting churn** in this
file. Outside this loop, most of the diff is reformatting (continuation indent
4 → 8, Javadoc whitespace). That makes a clean cherry-pick of the fix to
maintenance branches harder, and obscures what's actually changing for someone
bisecting later. Suggest two commits: one for the fix, one for the formatting —
or just drop the formatting touches from this PR.
##########
storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java:
##########
@@ -171,10 +171,15 @@ private static Map<String, Object>
getLagResultForKafka(String spoutId, SpoutSpe
String resultFromMonitor = new
ShellCommandRunnerImpl().execCommand(commands.toArray(new String[0]));
try {
- result = (Map<String, Object>)
JSONValue.parseWithException(resultFromMonitor);
+ Object parsed =
JSONValue.parseWithException(resultFromMonitor);
+ if (parsed instanceof Map) {
+ result = (Map<String, Object>) parsed;
+ } else {
+ // json-smart parses unquoted plain text leniently
as a String, so we can land here
+ // when the monitor printed an error message
instead of JSON; surface it as the error.
+ errorMsg = resultFromMonitor;
+ }
} catch (ParseException e) {
- LOGGER.debug("JSON parsing failed, assuming message as
error message: {}", resultFromMonitor);
- // json parsing fail -> error received
errorMsg = resultFromMonitor;
Review Comment:
Tiny nit: the previous code had a `LOGGER.debug("JSON parsing failed,
assuming message as error message: {}", resultFromMonitor)` here. Now that the
new `instanceof Map` path silently falls through to `errorMsg` *and* the
`ParseException` path silently does too, there's no log line that captures "the
monitor said something we couldn't parse."
The error does propagate via `errorMsg` so users see it, but losing the
debug log makes future debugging harder. Suggest keeping a single debug-level
log line in both branches, e.g. `LOGGER.debug("Monitor returned non-JSON
output, treating as error: {}", resultFromMonitor);` — same observability you
had before, no extra cost.
##########
external/storm-kafka-monitor/src/test/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtilTest.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.kafka.monitor;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.kafka.KafkaContainer;
+import org.testcontainers.utility.DockerImageName;
+
+@Testcontainers
Review Comment:
Should be `@Testcontainers(disabledWithoutDocker = true)`.
With bare `@Testcontainers`, this test hard-fails on any environment without
a Docker daemon (CI runners, sandboxed builds, contributor laptops without
Docker Desktop). With `disabledWithoutDocker = true` the JUnit engine *skips*
the test instead of failing it.
The Storm codebase already has precedent for this —
`storm-metrics-prometheus`:
```java
//
external/storm-metrics-prometheus/src/test/java/.../PrometheusPreparableReporterTest.java:43
@Testcontainers(disabledWithoutDocker = true)
```
(Note: `storm-redis/.../RedisFilterBoltTest.java:58` uses bare
`@Testcontainers` — that's an older pattern that probably should also be
updated, but out of scope for this PR.)
Also worth noting: a Kafka container cold-start is ~10–20s. Worth flagging
in the test class's javadoc that this is an integration test gated on Docker
availability — so a future contributor running `mvn test` on the module without
Docker understands why it's skipped.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]