[
https://issues.apache.org/jira/browse/FLINK-22803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dawid Wysakowicz closed FLINK-22803.
------------------------------------
Resolution: Invalid
> Running multiple CEP patterns
> -----------------------------
>
> Key: FLINK-22803
> URL: https://issues.apache.org/jira/browse/FLINK-22803
> Project: Flink
> Issue Type: Bug
> Components: Library / CEP
> Affects Versions: 1.13.0
> Reporter: Tejas Budukh
> Priority: Major
>
> Hi,
> I've tried to get help about this error on slack, user mailing list and
> stackOverflow but with no one responding. I don't know how else to get help
> hence creating this ticket.
> We are running into errors when running multiple CEP patterns. Here’s our
> use-case :
> We are planning to build a rule based engine on top of flink with huge
> number of rules and doing a POC for that. For POC we have around 1000
> pattern based rules which we are translating into CEP patterns and running
> these rules on a keyed stream of events data to detect patterns. We are
> partitioning the stream by orgId and each rule needs to be run into each
> org. Here’s the code we’ve written to implement that :
> _DataStream<Event> partitionedInput =_
> _eventStream.keyBy((KeySelector<Event, String>) Event::getOrgid);_
> _List<Rule> ruleList = new ArrayList<>();_
> _for (int i = 0; i < 100; i++) {_
> _ruleList.add(new Rule("rule" + i, "process1", "process2", "process3"));_
> _ruleList.add(_
> _new Rule("rule" + (i + 500), "process4", "process5", "process6"));_
> _}_
> _for (Rule rule : ruleList) {_
> _String st = rule.getStart();_
> _String mi = rule.getMid();_
> _String en = rule.getEnd();_
> _String nm = rule.getName();_
> _Pattern<Event, ?> pattern =_
> _Pattern.begin(_
> _Pattern.<Event>begin("start")_
> _.where(_
> _new SimpleCondition<Event>() {_
> _@Override_
> _public boolean filter(Event value) throws Exception {_
> _return value.getProcess().equals(st);_
> _}_
> _})_
> _.followedBy("middle")_
> _.where(_
> _new SimpleCondition<Event>() {_
> _@Override_
> _public boolean filter(Event event) {_
> _return !event.getProcess().equals(mi);_
> _}_
> _})_
> _.optional()_
> _.followedBy("end")_
> _.where(_
> _new SimpleCondition<Event>() {_
> _@Override_
> _public boolean filter(Event event) {_
> _return event.getProcess().equals(en);_
> _}_
> _}));_
> _PatternStream<Event> patternStream = CEP.pattern(partitionedInput,_
> _pattern);_
> _DataStream<String> alerts =_
> _patternStream.process(_
> _new PatternProcessFunction<Event, String>() {_
> _@Override_
> _public void processMatch(_
> _Map<String, List<Event>> map, Context context,_
> _Collector<String> collector)_
> _throws Exception {_
> _Event start = map.containsKey("start") ?_
> _map.get("start").get(0) : null;_
> _Event middle = map.containsKey("middle") ?_
> _map.get("middle").get(0) : null;_
> _Event end = map.containsKey("end") ? map.get("end").get(0) :_
> _null;_
> _StringJoiner joiner = new StringJoiner(",");_
> _joiner_
> _.add("Rule : " + nm + " ")_
> _.add((start == null ? "" : start.getId()))_
> _.add((middle == null ? "" : middle.getId()))_
> _.add((end == null ? "" : end.getId()));_
> _collector.collect(joiner.toString());_
> _}_
> _});_
> _alerts.print();_
> We tried to run this code on the flink cluster with 1 task manager with 4
> task slots and the task manager crashed with the error :
> _Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by_
> _NoRestartBackoffTimeStrategy_
> _at_
> _org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)_
> _at_
> _org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)_
> _at_
> _org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)_
> _at_
> _org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)_
> _at_
> _org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)_
> _at_
> _org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)_
> _at_
> _org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)_
> _at_
> _org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462)_
> _at_
> _org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1139)_
> _at_
> _org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1079)_
> _at_
> _org.apache.flink.runtime.executiongraph.Execution.markFailed(Execution.java:910)_
> _at_
> _org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$5(Execution.java:623)_
> _at_
> _java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)_
> _at_
> _java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)_
> _at_
> _java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)_
> _at_
> _org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)_
> _at_
> _org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)_
> _at_
> _org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)_
> _at_
> _org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)_
> _at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)_
> _at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)_
> _at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)_
> _at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)_
> _at
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)_
> _at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)_
> _at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)_
> _at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)_
> _at akka.actor.Actor.aroundReceive(Actor.scala:517)_
> _at akka.actor.Actor.aroundReceive$(Actor.scala:515)_
> _at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)_
> _at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)_
> _at akka.actor.ActorCell.invoke(ActorCell.scala:561)_
> _at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)_
> _at akka.dispatch.Mailbox.run(Mailbox.scala:225)_
> _at akka.dispatch.Mailbox.exec(Mailbox.scala:235)_
> _at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)_
> _at_
> _akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)_
> _at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)_
> _at_
> _akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)_
> _Caused by: java.util.concurrent.CompletionException:_
> _java.util.concurrent.TimeoutException: Invocation of public abstract_
> _java.util.concurrent.CompletableFuture_
> _org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time)_
> _timed out._
> _at_
> _java.base/java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:367)_
> _at_
> _java.base/java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:376)_
> _at_
> _java.base/java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:1019)_
> _at_
> _java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)_
> _at_
> _java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)_
> _at_
> _org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:234)_
> _at_
> _java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)_
> _at_
> _java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)_
> _at_
> _java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)_
> _at_
> _java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)_
> _at_
> _org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1079)_
> _at akka.dispatch.OnComplete.internal(Future.scala:263)_
> _at akka.dispatch.OnComplete.internal(Future.scala:261)_
> _at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)_
> _at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)_
> _at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)_
> _at_
> _org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)_
> _at_
> _scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)_
> _at_
> _scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)_
> _at_
> _scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)_
> _at_
> _scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)_
> _at
> akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650)_
> _at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)_
> _at_
> _scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)_
> _at
> scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)_
> _at
> scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)_
> _at_
> _scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)_
> _at_
> _akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)_
> _at_
> _akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279)_
> _at_
> _akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283)_
> _at_
> _akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235)_
> _at java.base/java.lang.Thread.run(Thread.java:834)_
> _Caused by: java.util.concurrent.TimeoutException: Invocation of public_
> _abstract java.util.concurrent.CompletableFuture_
> _org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time)_
> _timed out._
> _at_
> _org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:60)_
> _at_
> _org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:599)_
> _at_
> _java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)_
> _at_
> _java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)_
> _at
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)_
> _at_
> _java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)_
> _at_
> _java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)_
> _at_
> _java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)_
> _... 1 more_
> _Caused by: akka.pattern.AskTimeoutException: Ask timed out on_
> _[Actor[akka.tcp://[email protected]:52041/user/rpc/taskmanager_0#-1397184270]]_
> _after [10000 ms]. Message of type_
> _[org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation]. A typical_
> _reason for `AskTimeoutException` is that the recipient actor didn't send a_
> _reply._
> _at_
> _akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635)_
> _at
> akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650)_
> _at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)_
> _at_
> _scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)_
> _at
> scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)_
> _at
> scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)_
> _at_
> _scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)_
> _at_
> _akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)_
> _at_
> _akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279)_
> _at_
> _akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283)_
> _at_
> _akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235)_
> _... 1 more_
>
> Can somebody help with this ? Why is this code failing ? Is out approach
> scalable or Is there any better way of doing this ? Considering that every
> CEP operator creates a thread, will this work in production with so many
> threads per task slot ? Does CEP library support combining multiple patterns
> in a single operator/thread ?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)