[ 
https://issues.apache.org/jira/browse/FLINK-13059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-13059:
----------------------------------------

    Assignee: Mads Chr. Olesen

> Cassandra Connector leaks Semaphore on Exception; hangs on close
> ----------------------------------------------------------------
>
>                 Key: FLINK-13059
>                 URL: https://issues.apache.org/jira/browse/FLINK-13059
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Cassandra
>    Affects Versions: 1.8.0
>            Reporter: Mads Chr. Olesen
>            Assignee: Mads Chr. Olesen
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> In CassandraSinkBase the following code is present (comments are mine):
>  
> {code:java}
> public void invoke(IN value) throws Exception {
>    checkAsyncErrors();
>    tryAcquire();
>    //Semaphore held here
>    final ListenableFuture<V> result = send(value);
>    Futures.addCallback(result, callback); //Callback releases semaphore
> }{code}
> Any Exception happening inside send(value) will result in the semaphore not 
> being released. Such exceptions are possible, e.g.
> {code:java}
> com.datastax.driver.core.exceptions.InvalidQueryException: Some partition key 
> parts are missing: hest
> at 
> com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:50)
> at 
> com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
> at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:98)
> at com.datastax.driver.mapping.Mapper.getPreparedQuery(Mapper.java:118)
> at com.datastax.driver.mapping.Mapper.saveQuery(Mapper.java:201)
> at com.datastax.driver.mapping.Mapper.saveQuery(Mapper.java:163)
> at 
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.send(CassandraPojoSink.java:128)
> at 
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.invoke(CassandraSinkBase.java:131)
> at 
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> {code}
> The result of the semaphore not being released will be that when the 
> exception bubbles out and causes the job to close, CassandraSinkBase.flush() 
> will eventually be called. Flush will be deadlocked trying to acquire 
> config.getMaxConcurrentRequests() from the semaphore, which has 1 less than 
> that available.
> The Flink job will thus be half-way closed, but marked as "RUNNING". 
> Checkpointing will however fail with
> {noformat}
> INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding 
> checkpoint 201325 of job XXX. 
> org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException:
>  Task Source: XXX (3/4) was not running {noformat}
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to