Hi there,

I'm a big fan of KStreams - thanks for all the great work!!


I unfortunately (my fault) had a StackOverflowError bug in my KStream 
transformer which meant that the KStream died without reporting any Exception 
at all.


The first log message showed some polling activity and then you see later the 
State transition to PENDING_SHUTDOWN


Main Consumer poll completed in 2 ms and fetched 1 records
Flushing all global globalStores registered in the state manager
Idempotently invoking restoration logic in state RUNNING
Finished restoring all changelogs []
Idempotent restore call done. Thread state has not changed.
Processing tasks with 1 iterations.
Flushing all global globalStores registered in the state manager
State transition from RUNNING to PENDING_SHUTDOWN



This is because the StreamThread.run() method catches Exception only.


I ended up recompiling the kstreams and changing the catch to Throwable so I 
can see what was going on. Then I discovered my bad recursive call  :(


Can we please change the Catch to catch Throwable , so that we are always 
guaranteed some output?


StreamThread.java

@Override
public void run() {
    log.info("Starting");
    if (setState(State.STARTING) == null) {
        log.info("StreamThread already shutdown. Not running");
        return;
    }
    boolean cleanRun = false;
    try {
        runLoop();
        cleanRun = true;
    } catch (final Exception e) {
        // we have caught all Kafka related exceptions, and other runtime 
exceptions
        // should be due to user application errors

        if (e instanceof UnsupportedVersionException) {
            final String errorMessage = e.getMessage();
            if (errorMessage != null &&
                errorMessage.startsWith("Broker unexpectedly doesn't support 
requireStable flag on version ")) {

                log.error("Shutting down because the Kafka cluster seems to be 
on a too old version. " +
                    "Setting {}=\"{}\" requires broker version 2.5 or higher.",
                    StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
                    EXACTLY_ONCE_BETA);

                throw e;
            }
        }

        log.error("Encountered the following exception during processing " +
            "and the thread is going to shut down: ", e);
        throw e;
    } finally {
        completeShutdown(cleanRun);
    }
}


Thanks and kind regards


Scott Sinclair

Reply via email to