[
https://issues.apache.org/jira/browse/FLINK-13059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chesnay Schepler closed FLINK-13059.
------------------------------------
Fix Version/s: 1.9.1
1.10.0
1.8.2
Resolution: Fixed
master: fbb4837a4d274d19eddbcc0a9ab96724ad8ef972
1.9: 22571aab57fc30450de8a850f1a8a6ea80fdba2c
1.8: b7ce7b8ff14807e4981591a7e26c99d5051d529f
> Cassandra Connector leaks Semaphore on Exception; hangs on close
> ----------------------------------------------------------------
>
> Key: FLINK-13059
> URL: https://issues.apache.org/jira/browse/FLINK-13059
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Cassandra
> Affects Versions: 1.8.0
> Reporter: Mads Chr. Olesen
> Assignee: Mads Chr. Olesen
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.8.2, 1.10.0, 1.9.1
>
> Time Spent: 20m
> Remaining Estimate: 0h
>
> In CassandraSinkBase the following code is present (comments are mine):
>
> {code:java}
> public void invoke(IN value) throws Exception {
> checkAsyncErrors();
> tryAcquire();
> //Semaphore held here
> final ListenableFuture<V> result = send(value);
> Futures.addCallback(result, callback); //Callback releases semaphore
> }{code}
> Any Exception happening inside send(value) will result in the semaphore not
> being released. Such exceptions are possible, e.g.
> {code:java}
> com.datastax.driver.core.exceptions.InvalidQueryException: Some partition key
> parts are missing: hest
> at
> com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:50)
> at
> com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
> at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:98)
> at com.datastax.driver.mapping.Mapper.getPreparedQuery(Mapper.java:118)
> at com.datastax.driver.mapping.Mapper.saveQuery(Mapper.java:201)
> at com.datastax.driver.mapping.Mapper.saveQuery(Mapper.java:163)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.send(CassandraPojoSink.java:128)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.invoke(CassandraSinkBase.java:131)
> at
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> {code}
> The result of the semaphore not being released will be that when the
> exception bubbles out and causes the job to close, CassandraSinkBase.flush()
> will eventually be called. Flush will be deadlocked trying to acquire
> config.getMaxConcurrentRequests() from the semaphore, which has 1 less than
> that available.
> The Flink job will thus be half-way closed, but marked as "RUNNING".
> Checkpointing will however fail with
> {noformat}
> INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding
> checkpoint 201325 of job XXX.
> org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException:
> Task Source: XXX (3/4) was not running {noformat}
>
--
This message was sent by Atlassian Jira
(v8.3.2#803003)