[ 
https://issues.apache.org/jira/browse/FLINK-13059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-13059.
------------------------------------
    Fix Version/s: 1.9.1
                   1.10.0
                   1.8.2
       Resolution: Fixed

master: fbb4837a4d274d19eddbcc0a9ab96724ad8ef972
1.9: 22571aab57fc30450de8a850f1a8a6ea80fdba2c
1.8: b7ce7b8ff14807e4981591a7e26c99d5051d529f

> Cassandra Connector leaks Semaphore on Exception; hangs on close
> ----------------------------------------------------------------
>
>                 Key: FLINK-13059
>                 URL: https://issues.apache.org/jira/browse/FLINK-13059
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Cassandra
>    Affects Versions: 1.8.0
>            Reporter: Mads Chr. Olesen
>            Assignee: Mads Chr. Olesen
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.8.2, 1.10.0, 1.9.1
>
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> In CassandraSinkBase the following code is present (comments are mine):
>  
> {code:java}
> public void invoke(IN value) throws Exception {
>    checkAsyncErrors();
>    tryAcquire();
>    //Semaphore held here
>    final ListenableFuture<V> result = send(value);
>    Futures.addCallback(result, callback); //Callback releases semaphore
> }{code}
> Any Exception happening inside send(value) will result in the semaphore not 
> being released. Such exceptions are possible, e.g.
> {code:java}
> com.datastax.driver.core.exceptions.InvalidQueryException: Some partition key 
> parts are missing: hest
> at 
> com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:50)
> at 
> com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
> at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:98)
> at com.datastax.driver.mapping.Mapper.getPreparedQuery(Mapper.java:118)
> at com.datastax.driver.mapping.Mapper.saveQuery(Mapper.java:201)
> at com.datastax.driver.mapping.Mapper.saveQuery(Mapper.java:163)
> at 
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.send(CassandraPojoSink.java:128)
> at 
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.invoke(CassandraSinkBase.java:131)
> at 
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> {code}
> The result of the semaphore not being released will be that when the 
> exception bubbles out and causes the job to close, CassandraSinkBase.flush() 
> will eventually be called. Flush will be deadlocked trying to acquire 
> config.getMaxConcurrentRequests() from the semaphore, which has 1 less than 
> that available.
> The Flink job will thus be half-way closed, but marked as "RUNNING". 
> Checkpointing will however fail with
> {noformat}
> INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding 
> checkpoint 201325 of job XXX. 
> org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException:
>  Task Source: XXX (3/4) was not running {noformat}
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to