[ 
https://issues.apache.org/jira/browse/FLINK-22775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17446552#comment-17446552
 ] 

Etienne Chauchot edited comment on FLINK-22775 at 11/22/21, 9:03 AM:
---------------------------------------------------------------------

I took a look: it seems to be due to a load problem. When Cassandra writes data 
it waits for a given number of replicas to ack the write before responding to 
the client. In that case it writes to a node and to a replica. And it is the 
replica that did not respond in time hence the timeout at Cassandra client 
level. As the Cassandra cluster is an embedded daemon it is not very surprising 
that it is sensitive to load. I'd approach like this: 
 # first try to reduce the expectations in cassandra consistency by setting 
consistency to _ConsistencyLevel.ANY_ rather than _ConsistencyLevel.ONE_ which 
is ok for a test. Later see in time if the ITest is still flaky
 # If the test is still flaky, then migrate the cassandra test cluster from 
embedded daemon to either testContainers (relies on docker so less sensitive to 
load) or ASF v2 licenced test component such as Achilles (that I used in Apache 
Beam and that I contributed to) which has a lot of knobs for configuring the 
cluster.


was (Author: echauchot):
I took a look: it seems to be due to a load problem. When Cassandra writes data 
it waits for a given number of replicas to ack the write before responding to 
the client. In that case it writes to a node and to a replica. And it is the 
replica that did not respond in time hence the timeout at Cassandra client 
level. As the Cassandra cluster is an embedded daemon it is not very surprising 
that it is sensitive to load. I'd approach like this: 
 # first try to reduce the expectations in cassandra consistency by setting 
consistency to _ConsistencyLevel.ANY_ rather than _ConsistencyLevel.ONE_ and 
see in time if the ITest is still flaky
 # If the test is still flaky, then migrate the cassandra test cluster from 
embedded daemon to either testContainers (relies on docker so less sensitive to 
load) or ASF v2 licenced test component such as Achilles (that I used in Apache 
Beam and that I contributed to) which has a lot of knobs for configuring the 
cluster.

> CassandraConnectorITCase.testCassandraTableSink Fail
> ----------------------------------------------------
>
>                 Key: FLINK-22775
>                 URL: https://issues.apache.org/jira/browse/FLINK-22775
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Cassandra
>    Affects Versions: 1.14.0
>            Reporter: Guowei Ma
>            Assignee: Etienne Chauchot
>            Priority: Minor
>              Labels: auto-deprioritized-major, test-stability
>             Fix For: 1.15.0, 1.14.1
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18328&view=logs&j=ba53eb01-1462-56a3-8e98-0dd97fbcaab5&t=bfbc6239-57a0-5db0-63f3-41551b4f7d51&l=14105
> {code:java}
>  2021-05-25T23:03:44.0756266Z May 25 23:03:44 [ERROR] 
> testCassandraTableSink(org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase)
>   Time elapsed: 13.673 s  <<< ERROR!
> 2021-05-25T23:03:44.0757635Z May 25 23:03:44 
> java.util.concurrent.ExecutionException: 
> org.apache.flink.table.api.TableException: Failed to wait job finish
> 2021-05-25T23:03:44.0760262Z May 25 23:03:44  at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> 2021-05-25T23:03:44.0761504Z May 25 23:03:44  at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> 2021-05-25T23:03:44.0762906Z May 25 23:03:44  at 
> org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:129)
> 2021-05-25T23:03:44.0763878Z May 25 23:03:44  at 
> org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:92)
> 2021-05-25T23:03:44.0764918Z May 25 23:03:44  at 
> org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase.testCassandraTableSink(CassandraConnectorITCase.java:520)
> 2021-05-25T23:03:44.0768225Z May 25 23:03:44  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2021-05-25T23:03:44.0769100Z May 25 23:03:44  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2021-05-25T23:03:44.0769917Z May 25 23:03:44  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2021-05-25T23:03:44.0770645Z May 25 23:03:44  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2021-05-25T23:03:44.0771387Z May 25 23:03:44  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2021-05-25T23:03:44.0772228Z May 25 23:03:44  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2021-05-25T23:03:44.0773541Z May 25 23:03:44  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2021-05-25T23:03:44.0774367Z May 25 23:03:44  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2021-05-25T23:03:44.0775246Z May 25 23:03:44  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2021-05-25T23:03:44.0776088Z May 25 23:03:44  at 
> org.apache.flink.testutils.junit.RetryRule$RetryOnExceptionStatement.evaluate(RetryRule.java:192)
> 2021-05-25T23:03:44.0776946Z May 25 23:03:44  at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> 2021-05-25T23:03:44.0777685Z May 25 23:03:44  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> 2021-05-25T23:03:44.0778447Z May 25 23:03:44  at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 2021-05-25T23:03:44.0779110Z May 25 23:03:44  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 2021-05-25T23:03:44.0779893Z May 25 23:03:44  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 2021-05-25T23:03:44.0780744Z May 25 23:03:44  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 2021-05-25T23:03:44.0781493Z May 25 23:03:44  at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 2021-05-25T23:03:44.0782154Z May 25 23:03:44  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 2021-05-25T23:03:44.0782899Z May 25 23:03:44  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 2021-05-25T23:03:44.0783576Z May 25 23:03:44  at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 2021-05-25T23:03:44.0784312Z May 25 23:03:44  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 2021-05-25T23:03:44.0785020Z May 25 23:03:44  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2021-05-25T23:03:44.0785815Z May 25 23:03:44  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2021-05-25T23:03:44.0786619Z May 25 23:03:44  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2021-05-25T23:03:44.0787343Z May 25 23:03:44  at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 2021-05-25T23:03:44.0788202Z May 25 23:03:44  at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> 2021-05-25T23:03:44.0789018Z May 25 23:03:44  at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
> 2021-05-25T23:03:44.0789860Z May 25 23:03:44  at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
> 2021-05-25T23:03:44.0790915Z May 25 23:03:44  at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
> 2021-05-25T23:03:44.0791764Z May 25 23:03:44  at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
> 2021-05-25T23:03:44.0795986Z May 25 23:03:44  at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
> 2021-05-25T23:03:44.0797022Z May 25 23:03:44  at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
> 2021-05-25T23:03:44.0797839Z May 25 23:03:44  at 
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
> 2021-05-25T23:03:44.0798758Z May 25 23:03:44  at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
> 2021-05-25T23:03:44.0799520Z May 25 23:03:44 Caused by: 
> org.apache.flink.table.api.TableException: Failed to wait job finish
> 2021-05-25T23:03:44.0800276Z May 25 23:03:44  at 
> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:56)
> 2021-05-25T23:03:44.0801231Z May 25 23:03:44  at 
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
> 2021-05-25T23:03:44.0802456Z May 25 23:03:44  at 
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.isFirstRowReady(TableResultImpl.java:383)
> 2021-05-25T23:03:44.0803713Z May 25 23:03:44  at 
> org.apache.flink.table.api.internal.TableResultImpl.lambda$awaitInternal$1(TableResultImpl.java:116)
> 2021-05-25T23:03:44.0804612Z May 25 23:03:44  at 
> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
> 2021-05-25T23:03:44.0805457Z May 25 23:03:44  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 2021-05-25T23:03:44.0806266Z May 25 23:03:44  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 2021-05-25T23:03:44.0806967Z May 25 23:03:44  at 
> java.lang.Thread.run(Thread.java:748)
> 2021-05-25T23:03:44.0866172Z May 25 23:03:44 Caused by: 
> java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 2021-05-25T23:03:44.0867449Z May 25 23:03:44  at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> 2021-05-25T23:03:44.0868377Z May 25 23:03:44  at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> 2021-05-25T23:03:44.0869237Z May 25 23:03:44  at 
> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54)
> 2021-05-25T23:03:44.0869892Z May 25 23:03:44  ... 7 more
> 2021-05-25T23:03:44.0870472Z May 25 23:03:44 Caused by: 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 2021-05-25T23:03:44.0871291Z May 25 23:03:44  at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> 2021-05-25T23:03:44.0872219Z May 25 23:03:44  at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
> 2021-05-25T23:03:44.0873253Z May 25 23:03:44  at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> 2021-05-25T23:03:44.0873845Z May 25 23:03:44  at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
> 2021-05-25T23:03:44.0874364Z May 25 23:03:44  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2021-05-25T23:03:44.0874886Z May 25 23:03:44  at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> 2021-05-25T23:03:44.0875451Z May 25 23:03:44  at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
> 2021-05-25T23:03:44.0876134Z May 25 23:03:44  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> 2021-05-25T23:03:44.0876678Z May 25 23:03:44  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> 2021-05-25T23:03:44.0877195Z May 25 23:03:44  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2021-05-25T23:03:44.0878074Z May 25 23:03:44  at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> 2021-05-25T23:03:44.0878579Z May 25 23:03:44  at 
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1081)
> 2021-05-25T23:03:44.0879202Z May 25 23:03:44  at 
> akka.dispatch.OnComplete.internal(Future.scala:264)
> 2021-05-25T23:03:44.0879670Z May 25 23:03:44  at 
> akka.dispatch.OnComplete.internal(Future.scala:261)
> 2021-05-25T23:03:44.0880089Z May 25 23:03:44  at 
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
> 2021-05-25T23:03:44.0880532Z May 25 23:03:44  at 
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
> 2021-05-25T23:03:44.0880961Z May 25 23:03:44  at 
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> 2021-05-25T23:03:44.0881476Z May 25 23:03:44  at 
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
> 2021-05-25T23:03:44.0882021Z May 25 23:03:44  at 
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
> 2021-05-25T23:03:44.0882659Z May 25 23:03:44  at 
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
> 2021-05-25T23:03:44.0883141Z May 25 23:03:44  at 
> akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
> 2021-05-25T23:03:44.0883807Z May 25 23:03:44  at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
> 2021-05-25T23:03:44.0884370Z May 25 23:03:44  at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
> 2021-05-25T23:03:44.0884903Z May 25 23:03:44  at 
> scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
> 2021-05-25T23:03:44.0885378Z May 25 23:03:44  at 
> scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
> 2021-05-25T23:03:44.0885828Z May 25 23:03:44  at 
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> 2021-05-25T23:03:44.0886332Z May 25 23:03:44  at 
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> 2021-05-25T23:03:44.0886941Z May 25 23:03:44  at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
> 2021-05-25T23:03:44.0887820Z May 25 23:03:44  at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> 2021-05-25T23:03:44.0888701Z May 25 23:03:44  at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> 2021-05-25T23:03:44.0889222Z May 25 23:03:44  at 
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> 2021-05-25T23:03:44.0889731Z May 25 23:03:44  at 
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
> 2021-05-25T23:03:44.0890217Z May 25 23:03:44  at 
> akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> 2021-05-25T23:03:44.0890739Z May 25 23:03:44  at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> 2021-05-25T23:03:44.0891284Z May 25 23:03:44  at 
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 2021-05-25T23:03:44.0891773Z May 25 23:03:44  at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 2021-05-25T23:03:44.0892407Z May 25 23:03:44  at 
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 2021-05-25T23:03:44.0893148Z May 25 23:03:44  at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2021-05-25T23:03:44.0893889Z May 25 23:03:44 Caused by: 
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
> 2021-05-25T23:03:44.0894831Z May 25 23:03:44  at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
> 2021-05-25T23:03:44.0895906Z May 25 23:03:44  at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
> 2021-05-25T23:03:44.0896837Z May 25 23:03:44  at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
> 2021-05-25T23:03:44.0898172Z May 25 23:03:44  at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
> 2021-05-25T23:03:44.0898968Z May 25 23:03:44  at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
> 2021-05-25T23:03:44.0899722Z May 25 23:03:44  at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
> 2021-05-25T23:03:44.0900322Z May 25 23:03:44  at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
> 2021-05-25T23:03:44.0901200Z May 25 23:03:44  at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
> 2021-05-25T23:03:44.0901916Z May 25 23:03:44  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2021-05-25T23:03:44.0903255Z May 25 23:03:44  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2021-05-25T23:03:44.0904101Z May 25 23:03:44  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2021-05-25T23:03:44.0904577Z May 25 23:03:44  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2021-05-25T23:03:44.0905279Z May 25 23:03:44  at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
> 2021-05-25T23:03:44.0906348Z May 25 23:03:44  at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
> 2021-05-25T23:03:44.0907237Z May 25 23:03:44  at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> 2021-05-25T23:03:44.0908054Z May 25 23:03:44  at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> 2021-05-25T23:03:44.0908759Z May 25 23:03:44  at 
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> 2021-05-25T23:03:44.0909461Z May 25 23:03:44  at 
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> 2021-05-25T23:03:44.0910160Z May 25 23:03:44  at 
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> 2021-05-25T23:03:44.0910838Z May 25 23:03:44  at 
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> 2021-05-25T23:03:44.0911556Z May 25 23:03:44  at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> 2021-05-25T23:03:44.0912465Z May 25 23:03:44  at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> 2021-05-25T23:03:44.0913219Z May 25 23:03:44  at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> 2021-05-25T23:03:44.0913951Z May 25 23:03:44  at 
> akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> 2021-05-25T23:03:44.0914628Z May 25 23:03:44  at 
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> 2021-05-25T23:03:44.0915324Z May 25 23:03:44  at 
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> 2021-05-25T23:03:44.0915959Z May 25 23:03:44  at 
> akka.actor.ActorCell.invoke(ActorCell.scala:561)
> 2021-05-25T23:03:44.0916576Z May 25 23:03:44  at 
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> 2021-05-25T23:03:44.0916993Z May 25 23:03:44  at 
> akka.dispatch.Mailbox.run(Mailbox.scala:225)
> 2021-05-25T23:03:44.0917377Z May 25 23:03:44  at 
> akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> 2021-05-25T23:03:44.0917712Z May 25 23:03:44  ... 4 more
> 2021-05-25T23:03:44.0918361Z May 25 23:03:44 Caused by: 
> java.util.concurrent.ExecutionException: java.io.IOException: Error while 
> sending value.
> 2021-05-25T23:03:44.0919000Z May 25 23:03:44  at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> 2021-05-25T23:03:44.0919569Z May 25 23:03:44  at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
> 2021-05-25T23:03:44.0920156Z May 25 23:03:44  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:168)
> 2021-05-25T23:03:44.0921043Z May 25 23:03:44  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:131)
> 2021-05-25T23:03:44.0922194Z May 25 23:03:44  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:135)
> 2021-05-25T23:03:44.0923156Z May 25 23:03:44  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:135)
> 2021-05-25T23:03:44.0923976Z May 25 23:03:44  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:135)
> 2021-05-25T23:03:44.0924785Z May 25 23:03:44  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:429)
> 2021-05-25T23:03:44.0925329Z May 25 23:03:44  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:690)
> 2021-05-25T23:03:44.0925970Z May 25 23:03:44  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:641)
> 2021-05-25T23:03:44.0926619Z May 25 23:03:44  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:646)
> 2021-05-25T23:03:44.0927160Z May 25 23:03:44  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:619)
> 2021-05-25T23:03:44.0927650Z May 25 23:03:44  at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
> 2021-05-25T23:03:44.0928212Z May 25 23:03:44  at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> 2021-05-25T23:03:44.0928778Z May 25 23:03:44  at 
> java.lang.Thread.run(Thread.java:748)
> 2021-05-25T23:03:44.0929177Z May 25 23:03:44 Caused by: java.io.IOException: 
> Error while sending value.
> 2021-05-25T23:03:44.0929915Z May 25 23:03:44  at 
> org.apache.flink.streaming.connectors.cassandra.NoOpCassandraFailureHandler.onFailure(NoOpCassandraFailureHandler.java:33)
> 2021-05-25T23:03:44.0930633Z May 25 23:03:44  at 
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.checkAsyncErrors(CassandraSinkBase.java:169)
> 2021-05-25T23:03:44.0931258Z May 25 23:03:44  at 
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:103)
> 2021-05-25T23:03:44.0931832Z May 25 23:03:44  at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
> 2021-05-25T23:03:44.0932610Z May 25 23:03:44  at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:109)
> 2021-05-25T23:03:44.0933249Z May 25 23:03:44  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:213)
> 2021-05-25T23:03:44.0933970Z May 25 23:03:44  at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> 2021-05-25T23:03:44.0934671Z May 25 23:03:44  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:210)
> 2021-05-25T23:03:44.0935319Z May 25 23:03:44  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:185)
> 2021-05-25T23:03:44.0936064Z May 25 23:03:44  at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> 2021-05-25T23:03:44.0936840Z May 25 23:03:44  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> 2021-05-25T23:03:44.0937623Z May 25 23:03:44  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:97)
> 2021-05-25T23:03:44.0938728Z May 25 23:03:44  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:162)
> 2021-05-25T23:03:44.0939446Z May 25 23:03:44  ... 12 more
> 2021-05-25T23:03:44.0940208Z May 25 23:03:44 Caused by: 
> com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout 
> during write query at consistency ONE (1 replica were required but only 0 
> acknowledged the write)
> 2021-05-25T23:03:44.0941288Z May 25 23:03:44  at 
> com.datastax.driver.core.exceptions.WriteTimeoutException.copy(WriteTimeoutException.java:100)
> 2021-05-25T23:03:44.0942041Z May 25 23:03:44  at 
> com.datastax.driver.core.Responses$Error.asException(Responses.java:122)
> 2021-05-25T23:03:44.0942764Z May 25 23:03:44  at 
> com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:477)
> 2021-05-25T23:03:44.0943324Z May 25 23:03:44  at 
> com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1005)
> 2021-05-25T23:03:44.0943960Z May 25 23:03:44  at 
> com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:928)
> 2021-05-25T23:03:44.0944782Z May 25 23:03:44  at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
> 2021-05-25T23:03:44.0945489Z May 25 23:03:44  at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
> 2021-05-25T23:03:44.0946107Z May 25 23:03:44  at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
> 2021-05-25T23:03:44.0946713Z May 25 23:03:44  at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
> 2021-05-25T23:03:44.0947281Z May 25 23:03:44  at 
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
> 2021-05-25T23:03:44.0948081Z May 25 23:03:44  at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
> 2021-05-25T23:03:44.0948676Z May 25 23:03:44  at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
> 2021-05-25T23:03:44.0949281Z May 25 23:03:44  at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
> 2021-05-25T23:03:44.0949866Z May 25 23:03:44  at 
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
> 2021-05-25T23:03:44.0950438Z May 25 23:03:44  at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
> 2021-05-25T23:03:44.0951042Z May 25 23:03:44  at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
> 2021-05-25T23:03:44.0951648Z May 25 23:03:44  at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
> 2021-05-25T23:03:44.0952212Z May 25 23:03:44  at 
> io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
> 2021-05-25T23:03:44.0952930Z May 25 23:03:44  at 
> io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:308)
> 2021-05-25T23:03:44.0953465Z May 25 23:03:44  at 
> io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:422)
> 2021-05-25T23:03:44.0954012Z May 25 23:03:44  at 
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
> 2021-05-25T23:03:44.0954589Z May 25 23:03:44  at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
> 2021-05-25T23:03:44.0955184Z May 25 23:03:44  at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
> 2021-05-25T23:03:44.0955784Z May 25 23:03:44  at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
> 2021-05-25T23:03:44.0956375Z May 25 23:03:44  at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
> 2021-05-25T23:03:44.0956955Z May 25 23:03:44  at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
> 2021-05-25T23:03:44.0957584Z May 25 23:03:44  at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
> 2021-05-25T23:03:44.0958522Z May 25 23:03:44  at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
> 2021-05-25T23:03:44.0959393Z May 25 23:03:44  at 
> io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
> 2021-05-25T23:03:44.0960464Z May 25 23:03:44  at 
> io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
> 2021-05-25T23:03:44.0961208Z May 25 23:03:44  at 
> io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
> 2021-05-25T23:03:44.0961971Z May 25 23:03:44  at 
> io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
> 2021-05-25T23:03:44.0962810Z May 25 23:03:44  at 
> io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> 2021-05-25T23:03:44.0963276Z May 25 23:03:44  ... 1 more
> 2021-05-25T23:03:44.0964059Z May 25 23:03:44 Caused by: 
> com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout 
> during write query at consistency ONE (1 replica were required but only 0 
> acknowledged the write)
> 2021-05-25T23:03:44.0965028Z May 25 23:03:44  at 
> com.datastax.driver.core.Responses$Error$1.decode(Responses.java:59)
> 2021-05-25T23:03:44.0965751Z May 25 23:03:44  at 
> com.datastax.driver.core.Responses$Error$1.decode(Responses.java:37)
> 2021-05-25T23:03:44.0966501Z May 25 23:03:44  at 
> com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:266)
> 2021-05-25T23:03:44.0967135Z May 25 23:03:44  at 
> com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:246)
> 2021-05-25T23:03:44.0968262Z May 25 23:03:44  at 
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
> 2021-05-25T23:03:44.0968877Z May 25 23:03:44  ... 20 more
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to