[ 
https://issues.apache.org/jira/browse/KAFKA-4371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15634793#comment-15634793
 ] 

Ewen Cheslack-Postava commented on KAFKA-4371:
----------------------------------------------

[~sagarrao] I'm a bit confused by the report. You say it takes down the whole 
connect process, but then talk about using the REST API after the failure. Does 
the process actually die or are you just saying that the process no longer 
continues to process data? If it's the former, then it'd be a bug with the 
framework. If it's a latter, this is just an issue with the JDBC connector 
which is why [~skyahead] was saying the bug probably should be filed in the 
JDBC connector's repository.


> Sporadic ConnectException shuts down the whole connect process
> --------------------------------------------------------------
>
>                 Key: KAFKA-4371
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4371
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Sagar Rao
>            Priority: Critical
>
> I had setup a 2 node distributed kafka-connect process. Everything went well 
> and I could see lot of data flowing into the relevant kafka topics.
> After some time, JDBCUtils.getCurrentTimeOnDB threw a ConnectException with 
> the following stacktrace:
> The last packet successfully received from the server was 792 milliseconds 
> ago.  The last packet sent successfully to the server was 286 milliseconds 
> ago. (io.confluent.connect.jdbc.source.JdbcSourceTask:234)
> [2016-11-02 12:42:06,116] ERROR Failed to get current time from DB using 
> query select CURRENT_TIMESTAMP; on database MySQL 
> (io.confluent.connect.jdbc.util.JdbcUtils:226)
> com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link 
> failure
> The last packet successfully received from the server was 1,855 milliseconds 
> ago.  The last packet sent successfully to the server was 557 milliseconds 
> ago.
>        at sun.reflect.GeneratedConstructorAccessor51.newInstance(Unknown 
> Source)
>        at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>        at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
>        at 
> com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1117)
>        at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3829)
>        at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2449)
>        at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2629)
>        at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2719)
>        at 
> com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2155)
>        at 
> com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1379)
>        at 
> com.mysql.jdbc.StatementImpl.createResultSetUsingServerFetch(StatementImpl.java:651)
>        at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1527)
>        at 
> io.confluent.connect.jdbc.util.JdbcUtils.getCurrentTimeOnDB(JdbcUtils.java:220)
>        at 
> io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:157)
>        at 
> io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:78)
>        at 
> io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:57)
>        at 
> io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:207)
>        at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:155)
>        at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
>        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
>        at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>        at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>        at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>        at java.lang.Thread.run(Thread.java:745)
> Caused by: java.net.SocketException: Broken pipe (Write failed)
>        at java.net.SocketOutputStream.socketWrite0(Native Method)
>        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
>        at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
>        at 
> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
>        at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
>        at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3810)
>        ... 20 more
> This was just a minor glitch to the connection as the ec2 isntances are able 
> to connect to the Mysql Aurora instances without any issues.
> But, after this exception(which is there a number of times), none of the 
> connectors' tasks are executing. Beyond this, all I see in the logs is 
> [2016-11-02 16:17:41,983] ERROR Failed to run query for table 
> TimestampIncrementingTableQuerier{name='eng_match_series', query='null', 
> topicPrefix='ci-eng-', timestampColumn='modified', incrementingColumn='id'}: 
> com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: No 
> operations allowed after statement closed. 
> (io.confluent.connect.jdbc.source.JdbcSourceTask:234)
> Is this expected behaviour? I restarted the connector using REST apis but 
> that didn't help. How do we handle such scenarios? 
> Eventually I had to delete the connector and restart.
> The kafka version I am using is 0.10.0.1-cp1 as there were some custom 
> changes we needed to make at the Connect level.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to