DiogoP98 opened a new issue, #8588:
URL: https://github.com/apache/storm/issues/8588

   ### Description
   
   After upgrading from Apache Storm 2.8.0 to 2.8.7, the "Kafka Spouts Lag" 
panel in the Storm UI stops working for many topologies. The UI displays no lag 
information for affected spouts, and `nimbus.log` / `ui.log` are flooded with 
`ClassCastException` warnings (with a null exception message) for every Kafka 
spout in those topologies.
   
   ### To Reproduce
   
   1. Run a topology containing one or more `KafkaSpout` instances.
   2. Configure at least one of the spout's subscribed topic-partitions so that 
no offset has been committed yet for the consumer group (e.g., a brand-new 
consumer group, a passive replication topic, or an idle partition).
   3. Open the topology page in Storm UI and look at the "Kafka Spouts Lag" 
panel.
   4. Observe that the panel shows no data for that spout.
   5. Inspect the UI/Nimbus logs.
   
   ### Expected behavior
   
   The lag panel reports per-partition lag for each Kafka spout. Partitions 
with no committed offset are reported gracefully (e.g., as -1) without breaking 
the rest of the spout's lag query. No `ClassCastException` warnings appear in 
the logs.
   
   ### Actual behavior
   
   For every affected spout, the following warning is logged on each lag 
refresh:
   ```
   TopologySpoutLag [WARN] Exception thrown while getting lag for spout id: 
<spoutId>
   TopologySpoutLag [WARN] Exception message:null
   java.lang.ClassCastException
   ```
   
   The spoutLagResult for those spouts is missing entirely from the 
`/api/v1/topology/<id>/lag` response, and the UI's "Kafka Spouts Lag" panel 
stays empty.
   
   ### Identified Root cause
   
   There are two interacting bugs introduced by the Kafka client upgrade from 
3.9.0 to 4.x (commit 2958d3ff8):
   
     1. **KafkaOffsetLagUtil.getOffsetLags** (`storm-kafka-monitor`): the 
[Kafka 4 API 
replaced](https://kafka.apache.org/40/getting-started/upgrade/#:~:text=timeout%2E-,The,instead)
 **consumer.committed(TopicPartition)** (which returned OffsetAndMetadata or 
null) with **consumer.committed(Set<TopicPartition>)** (which returns a Map). 
The migration kept the old null-check shape but on the wrong object:
     ```
     Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata = 
consumer.committed(Collections.singleton(topicPartition));
     long committedOffset = offsetAndMetadata != null ? 
offsetAndMetadata.get(topicPartition).offset() : -1;
   ```
     2. The map itself is never null, but its value for a partition without a 
committed offset is null. .offset() on null throws `NullPointerException`, 
which is caught by main().
     3. **TopologySpoutLag.getLagResultForKafka** (`storm-core`): captures the 
monitor's `stdout` and parses it with `JSONValue.parseWithException`, expecting 
either a JSON object or a ParseException. However, json-smart parses unquoted 
plain text leniently as a java.lang.String rather than throwing ParseException. 
The unchecked cast (Map<String, Object>) parsedResult then throws 
ClassCastException (with null detail message on some JVMs), which is caught by 
the outer handler in lag() and surfaces as the warning above. The original 
error text from the monitor is lost.
   
   In  previous Storm version with Kafka client 3.9.0, the 
`committed(TopicPartition)` API correctly returned null, the existing 
null-check worked, and the chain never broke, so this only surfaces after the 
4.x upgrade.
   
   ### Proposed fix
   1. In KafkaOffsetLagUtil, null-check the value retrieved from the map, not 
the map itself:  
    ```
     Map<TopicPartition, OffsetAndMetadata> committed = 
consumer.committed(Collections.singleton(topicPartition));
     OffsetAndMetadata partitionOffset = committed.get(topicPartition);
     long committedOffset = partitionOffset != null ? partitionOffset.offset() 
: -1;
   ```
     2. In TopologySpoutLag, defensively handle the case where 
`JSONValue.parseWithException` succeeds but yields a non-Map (e.g., a String 
from leniently parsed error text). Treat that as a monitor failure and surface 
the raw stdout as the spout's errorInfo, instead of letting the cast throw.
   
   This way uncommitted partitions report -1 instead of crashing the whole 
spout's query, and any future monitor error is propagated to the UI as a useful 
error message rather than silently lost.
   
   ### Environment
   
   - Apache Storm 2.8.7
   - Java 17


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to