[
https://issues.apache.org/jira/browse/FLINK-13059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chesnay Schepler reassigned FLINK-13059:
----------------------------------------
Assignee: Mads Chr. Olesen
> Cassandra Connector leaks Semaphore on Exception; hangs on close
> ----------------------------------------------------------------
>
> Key: FLINK-13059
> URL: https://issues.apache.org/jira/browse/FLINK-13059
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Cassandra
> Affects Versions: 1.8.0
> Reporter: Mads Chr. Olesen
> Assignee: Mads Chr. Olesen
> Priority: Major
> Labels: pull-request-available
> Time Spent: 10m
> Remaining Estimate: 0h
>
> In CassandraSinkBase the following code is present (comments are mine):
>
> {code:java}
> public void invoke(IN value) throws Exception {
> checkAsyncErrors();
> tryAcquire();
> //Semaphore held here
> final ListenableFuture<V> result = send(value);
> Futures.addCallback(result, callback); //Callback releases semaphore
> }{code}
> Any Exception happening inside send(value) will result in the semaphore not
> being released. Such exceptions are possible, e.g.
> {code:java}
> com.datastax.driver.core.exceptions.InvalidQueryException: Some partition key
> parts are missing: hest
> at
> com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:50)
> at
> com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
> at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:98)
> at com.datastax.driver.mapping.Mapper.getPreparedQuery(Mapper.java:118)
> at com.datastax.driver.mapping.Mapper.saveQuery(Mapper.java:201)
> at com.datastax.driver.mapping.Mapper.saveQuery(Mapper.java:163)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.send(CassandraPojoSink.java:128)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.invoke(CassandraSinkBase.java:131)
> at
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> {code}
> The result of the semaphore not being released will be that when the
> exception bubbles out and causes the job to close, CassandraSinkBase.flush()
> will eventually be called. Flush will be deadlocked trying to acquire
> config.getMaxConcurrentRequests() from the semaphore, which has 1 less than
> that available.
> The Flink job will thus be half-way closed, but marked as "RUNNING".
> Checkpointing will however fail with
> {noformat}
> INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding
> checkpoint 201325 of job XXX.
> org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException:
> Task Source: XXX (3/4) was not running {noformat}
>
--
This message was sent by Atlassian Jira
(v8.3.2#803003)