[ 
https://issues.apache.org/jira/browse/FLINK-17493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106389#comment-17106389
 ] 

Oisín Mac Fhearaí commented on FLINK-17493:
-------------------------------------------

This seems to be affecting us as well, in Flink 1.10. I can run a job once and 
write to the Cassandra sink.

If I cancel the job and start it again, this happens:

 {code}
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried 
for query failed (tried: /172.31.xxx.xx1:9042 
(com.datastax.driver.core.exceptions.TransportException: [/172.31.xxx.xx1] 
Error writing), /172.31.xxx.xx2:9042 
(com.datastax.driver.core.exceptions.TransportException: [/172.31.xxx.xx2] 
Error writing), /172.31.xxx.xx3:9042 
(com.datastax.driver.core.exceptions.TransportException: [/172.31.xxx.xx3] 
Error writing))
        at 
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
        at 
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
        at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
        at com.datastax.driver.core.Cluster.init(Cluster.java:162)
        at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
        at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
        at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
        at 
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.createSession(CassandraSinkBase.java:143)
        at 
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:87)
        at 
org.apache.flink.streaming.connectors.cassandra.AbstractCassandraTupleSink.open(AbstractCassandraTupleSink.java:49)
        at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
        at 
org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
        at java.lang.Thread.run(Thread.java:748)
{code}

Non-heap memory climbs by about 33mb each time the job restarts, and never 
reduces. It started at around 100mb and is now around 1140mb. Direct memory 
climbed up to 1.6 gb and stayed there.

> Possible direct memory leak in cassandra sink
> ---------------------------------------------
>
>                 Key: FLINK-17493
>                 URL: https://issues.apache.org/jira/browse/FLINK-17493
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Cassandra
>    Affects Versions: 1.9.3, 1.10.0
>            Reporter: nobleyd
>            Priority: Major
>
> # Cassandra Sink use direct memorys.
>  # Start a standalone cluster(1 machines) for test.
>  # After the cluster started, check the flink web-ui, and record the task 
> manager's memory info. I mean the direct memory part info.
>  # Start a job which read from kafka and write to cassandra using the 
> cassandra sink, and you can see that the direct memory count in 'Outside JVM' 
> part go up.
>  # Stop the job, and the direct memory count is not decreased(using 'jmap 
> -histo:live pid' to make the task manager gc).
>  # Repeat serveral times, the direct memory count will be more and more.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to