[ 
https://issues.apache.org/jira/browse/KAFKA-14184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Suresh Rukmangathan updated KAFKA-14184:
----------------------------------------
    Description: 
Kafka streams application is crashing with following stack trace with 3 frames 
from the app removed that are process/state-store related functions.

 
{code:java}
java.lang.UnsupportedOperationException: this should not happen: timestamp() is 
not supported in standby tasks.\n\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.throwUnsupportedOperationExceptionIfStandby(ProcessorContextImpl.java:352)\n\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.timestamp(ProcessorContextImpl.java:328)\n\n\tat
 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.log(ChangeLoggingKeyValueBytesStore.java:136)\n\n\tat
 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:78)\n\n\tat
 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:32)\n\n\tat
 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$4(MeteredKeyValueStore.java:197)\n\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)\n\n\tat
 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:197)\n\n\tat
 
org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:120)\n\n\tat
 // app-calls to process & save to state store - 3 frames 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)\n\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)\n\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)\n\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\n\tat
 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)\n\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703)\n\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)\n\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:703)\n\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105)\n\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647)\n\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)\n\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)\n"
{code}
 

Key Kafka streams application configuration details are as below:-
{code:java}
{replication.factor=1, num.standby.replicas=1, topology.optimization=all, 
max.request.size=1048576, auto.offset.reset=earliest}{code}
 

If Kafka streams replication factor = 1 and standby replicas=1, is that an 
issue? Do we expect that the replication factor should be at least n+1, if 
standby replicas=1 (or) there is no relationship?

 

Couple of more data points are:-
 # Crash stopped once I made the standby replicas to 0.
 # Crash also stopped once I made the number of instances (only one pod - one 
pod has only one instance of the application running)

 

So, is there something that is not correct in the way we have configured the 
Kafka Streams application and/or Kafka is not handling the standby task 
correctly in case of state store replica/standby handling (like calling the put 
from the standby context, when it is not supposed to)?

We don't want to loose out on standby replica, as it will help faster recovery 
of consumers - hence setting standby=0 is like not using that feature - and 
that is not an option.

Is there a way to debug this better by enabling some logs from Kafka (like set 
some env var to add more debug logs) to isolate the issue? Not sure if this 
issue specific to 2.7.0 and things are resolved in later releases.

  was:
Kafka streams application is crashing with following stack trace with 3 frames 
from the app removed that are process/state-store related functions.

 
{code:java}
java.lang.UnsupportedOperationException: this should not happen: timestamp() is 
not supported in standby tasks.\n\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.throwUnsupportedOperationExceptionIfStandby(ProcessorContextImpl.java:352)\n\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.timestamp(ProcessorContextImpl.java:328)\n\n\tat
 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.log(ChangeLoggingKeyValueBytesStore.java:136)\n\n\tat
 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:78)\n\n\tat
 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:32)\n\n\tat
 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$4(MeteredKeyValueStore.java:197)\n\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)\n\n\tat
 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:197)\n\n\tat
 
org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:120)\n\n\tat
 // app-calls to process & save to state store - 3 frames 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)\n\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)\n\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)\n\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\n\tat
 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)\n\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703)\n\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)\n\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:703)\n\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105)\n\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647)\n\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)\n\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)\n"
{code}
 

Key Kafka streams application configuration details are as below:-
{code:java}
{replication.factor=1, num.standby.replicas=1, topology.optimization=all, 
producer.partitioner.class=com.hpe.gravity.gravitycommon.utils.Murmur3Partitioner,
 max.request.size=1048576, auto.offset.reset=earliest}{code}
 

If Kafka streams replication factor = 1 and standby replicas=1, is that an 
issue? Do we expect that the replication factor should be at least n+1, if 
standby replicas=1 (or) there is no relationship?

 

Couple of more data points are:-
 # Crash stopped once I made the standby replicas to 0.
 # Crash also stopped once I made the number of instances (only one pod - one 
pod has only one instance of the application running)

 

So, is there something that is not correct in the way we have configured the 
Kafka Streams application and/or Kafka is not handling the standby task 
correctly in case of state store replica/standby handling (like calling the put 
from the standby context, when it is not supposed to)?

We don't want to loose out on standby replica, as it will help faster recovery 
of consumers - hence setting standby=0 is like not using that feature - and 
that is not an option.

Is there a way to debug this better by enabling some logs from Kafka (like set 
some env var to add more debug logs) to isolate the issue? Not sure if this 
issue specific to 2.7.0 and things are resolved in later releases.


> Kafka streams application crashes due to "UnsupportedOperationException: this 
> should not happen: timestamp() is not supported in standby tasks."
> ------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-14184
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14184
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer, streams
>    Affects Versions: 2.7.0
>            Reporter: Suresh Rukmangathan
>            Priority: Critical
>
> Kafka streams application is crashing with following stack trace with 3 
> frames from the app removed that are process/state-store related functions.
>  
> {code:java}
> java.lang.UnsupportedOperationException: this should not happen: timestamp() 
> is not supported in standby tasks.\n\n\tat 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.throwUnsupportedOperationExceptionIfStandby(ProcessorContextImpl.java:352)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.timestamp(ProcessorContextImpl.java:328)\n\n\tat
>  
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.log(ChangeLoggingKeyValueBytesStore.java:136)\n\n\tat
>  
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:78)\n\n\tat
>  
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:32)\n\n\tat
>  
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$4(MeteredKeyValueStore.java:197)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)\n\n\tat
>  
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:197)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:120)\n\n\tat
>  // app-calls to process & save to state store - 3 frames 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:703)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)\n"
> {code}
>  
> Key Kafka streams application configuration details are as below:-
> {code:java}
> {replication.factor=1, num.standby.replicas=1, topology.optimization=all, 
> max.request.size=1048576, auto.offset.reset=earliest}{code}
>  
> If Kafka streams replication factor = 1 and standby replicas=1, is that an 
> issue? Do we expect that the replication factor should be at least n+1, if 
> standby replicas=1 (or) there is no relationship?
>  
> Couple of more data points are:-
>  # Crash stopped once I made the standby replicas to 0.
>  # Crash also stopped once I made the number of instances (only one pod - one 
> pod has only one instance of the application running)
>  
> So, is there something that is not correct in the way we have configured the 
> Kafka Streams application and/or Kafka is not handling the standby task 
> correctly in case of state store replica/standby handling (like calling the 
> put from the standby context, when it is not supposed to)?
> We don't want to loose out on standby replica, as it will help faster 
> recovery of consumers - hence setting standby=0 is like not using that 
> feature - and that is not an option.
> Is there a way to debug this better by enabling some logs from Kafka (like 
> set some env var to add more debug logs) to isolate the issue? Not sure if 
> this issue specific to 2.7.0 and things are resolved in later releases.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to