This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 6acd58a MINOR: Log exception thrown by consumer.poll() in
VerifiableConsumer (#6368)
6acd58a is described below
commit 6acd58a4236f5553e689a80d1bd48cd408cb0414
Author: Bob Barrett <[email protected]>
AuthorDate: Tue Mar 5 21:12:47 2019 -0500
MINOR: Log exception thrown by consumer.poll() in VerifiableConsumer (#6368)
SecurityTest.test_client_ssl_endpoint_validation_failure is failing because
it greps for 'SSLHandshakeException in the consumer and producer log files.
With the fix for KAKFA-7773, the test uses the VerifiableConsumer instead of
the ConsoleConsumer, which does not log the exception stack trace to the
service log. This patch catches exceptions in the VerifiableConsumer and logs
them in order to fix the test. Tested by running the test locally.
Reviewers: Ismael Juma <[email protected]>, Jason Gustafson
<[email protected]>
---
.../src/main/java/org/apache/kafka/tools/VerifiableConsumer.java | 8 ++++++++
1 file changed, 8 insertions(+)
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index 58f3471..e6955ba 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -43,6 +43,8 @@ import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
@@ -82,6 +84,8 @@ import static
net.sourceforge.argparse4j.impl.Arguments.storeTrue;
*/
public class VerifiableConsumer implements Closeable, OffsetCommitCallback,
ConsumerRebalanceListener {
+ private static final Logger log =
LoggerFactory.getLogger(VerifiableConsumer.class);
+
private final ObjectMapper mapper = new ObjectMapper();
private final PrintStream out;
private final KafkaConsumer<String, String> consumer;
@@ -233,6 +237,10 @@ public class VerifiableConsumer implements Closeable,
OffsetCommitCallback, Cons
}
} catch (WakeupException e) {
// ignore, we are closing
+ log.trace("Caught WakeupException because consumer is shutdown,
ignore and terminate.", e);
+ } catch (Throwable t) {
+ // Log the error so it goes to the service log and not stdout
+ log.error("Error during processing, terminating consumer process:
", t);
} finally {
consumer.close();
printJson(new ShutdownComplete());