[
https://issues.apache.org/jira/browse/FLINK-5101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15834418#comment-15834418
]
ASF GitHub Bot commented on FLINK-5101:
---------------------------------------
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/2866#discussion_r97309628
--- Diff:
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
---
@@ -69,17 +87,27 @@ public void onFailure(Throwable t) {
@Override
public void invoke(IN value) throws Exception {
- if (exception != null) {
- throw new IOException("invoke() failed", exception);
+ Throwable e = exception.get();
+ if (e != null) {
+ throw new IOException("Error while sending value.", e);
}
ListenableFuture<V> result = send(value);
+ updatesPending.incrementAndGet();
Futures.addCallback(result, callback);
}
public abstract ListenableFuture<V> send(IN value);
@Override
public void close() {
+ while (updatesPending.get() > 0) {
+ synchronized (updatesPending) {
+ try {
+ updatesPending.wait();
+ } catch (InterruptedException e) {
+ }
--- End diff --
Probably better here even: Throw an exception, because closing is
incomplete when interrupted (and cannot guarantee correctness)
> Test CassandraConnectorITCase instable
> --------------------------------------
>
> Key: FLINK-5101
> URL: https://issues.apache.org/jira/browse/FLINK-5101
> Project: Flink
> Issue Type: Bug
> Components: Cassandra Connector
> Reporter: Stefan Richter
> Assignee: Chesnay Schepler
>
> I observed this test fail on Travis (very rarely):
>
> Running
> org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase
> Tests run: 7, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 80.843 sec
> <<< FAILURE! - in
> org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase
> testCassandraBatchFormats(org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase)
> Time elapsed: 5.82 sec <<< FAILURE!
> java.lang.AssertionError: expected:<40> but was:<20>
> at org.junit.Assert.fail(Assert.java:88)
> at org.junit.Assert.failNotEquals(Assert.java:834)
> at org.junit.Assert.assertEquals(Assert.java:645)
> at org.junit.Assert.assertEquals(Assert.java:631)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase.testCassandraBatchFormats(CassandraConnectorITCase.java:442)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)