[ 
https://issues.apache.org/jira/browse/FLINK-20874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu closed FLINK-20874.
---------------------------
    Resolution: Cannot Reproduce

Just closing this ticket for now as it has not occurred for almost half a year.

> Python DataStreamTests.test_key_by_on_connect_stream test failed with 
> "ArrayIndexOutOfBoundsException"
> ------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-20874
>                 URL: https://issues.apache.org/jira/browse/FLINK-20874
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 1.13.0
>            Reporter: Huang Xingbo
>            Priority: Minor
>              Labels: auto-deprioritized-major, test-stability
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=11704&view=logs&j=e92ecf6d-e207-5a42-7ff7-528ff0c5b259&t=d59eb898-29f7-5a99-91a7-b2dfc3e8a653]
> {code:java}
> 2021-01-07T00:00:50.7664267Z =================================== FAILURES 
> ===================================
> 2021-01-07T00:00:50.7664744Z ________________ 
> DataStreamTests.test_key_by_on_connect_stream _________________
> 2021-01-07T00:00:50.7665023Z 
> 2021-01-07T00:00:50.7665343Z self = 
> <pyflink.datastream.tests.test_data_stream.DataStreamTests 
> testMethod=test_key_by_on_connect_stream>
> 2021-01-07T00:00:50.7665648Z 
> 2021-01-07T00:00:50.7669970Z     def test_key_by_on_connect_stream(self):
> 2021-01-07T00:00:50.7671398Z         ds1 = self.env.from_collection([('a', 
> 0), ('b', 0), ('c', 1), ('d', 1), ('e', 2)],
> 2021-01-07T00:00:50.7672001Z                                        
> type_info=Types.ROW([Types.STRING(), Types.INT()])) \
> 2021-01-07T00:00:50.7672425Z             .key_by(MyKeySelector(), 
> key_type_info=Types.INT())
> 2021-01-07T00:00:50.7673082Z         ds2 = self.env.from_collection([('a', 
> 0), ('b', 0), ('c', 1), ('d', 1), ('e', 2)],
> 2021-01-07T00:00:50.7673552Z                                        
> type_info=Types.ROW([Types.STRING(), Types.INT()]))
> 2021-01-07T00:00:50.7673841Z     
> 2021-01-07T00:00:50.7674120Z         class 
> AssertKeyCoMapFunction(CoMapFunction):
> 2021-01-07T00:00:50.7674440Z             def __init__(self):
> 2021-01-07T00:00:50.7674725Z                 self.pre1 = None
> 2021-01-07T00:00:50.7674985Z                 self.pre2 = None
> 2021-01-07T00:00:50.7675189Z     
> 2021-01-07T00:00:50.7675429Z             def map1(self, value):
> 2021-01-07T00:00:50.7675888Z                 if value[0] == 'b':
> 2021-01-07T00:00:50.7676321Z                     assert self.pre1 == 'a'
> 2021-01-07T00:00:50.7676761Z                 if value[0] == 'd':
> 2021-01-07T00:00:50.7677190Z                     assert self.pre1 == 'c'
> 2021-01-07T00:00:50.7677483Z                 self.pre1 = value[0]
> 2021-01-07T00:00:50.7677752Z                 return value
> 2021-01-07T00:00:50.7677951Z     
> 2021-01-07T00:00:50.7678190Z             def map2(self, value):
> 2021-01-07T00:00:50.7679034Z                 if value[0] == 'b':
> 2021-01-07T00:00:50.7679468Z                     assert self.pre2 == 'a'
> 2021-01-07T00:00:50.7679910Z                 if value[0] == 'd':
> 2021-01-07T00:00:50.7680358Z                     assert self.pre2 == 'c'
> 2021-01-07T00:00:50.7680632Z                 self.pre2 = value[0]
> 2021-01-07T00:00:50.7680895Z                 return value
> 2021-01-07T00:00:50.7681289Z     
> 2021-01-07T00:00:50.7681581Z         ds1.connect(ds2)\
> 2021-01-07T00:00:50.7681918Z             .key_by(MyKeySelector(), 
> MyKeySelector(), key_type_info=Types.INT())\
> 2021-01-07T00:00:50.7682271Z             .map(AssertKeyCoMapFunction())\
> 2021-01-07T00:00:50.7682561Z             .add_sink(self.test_sink)
> 2021-01-07T00:00:50.7682785Z     
> 2021-01-07T00:00:50.7683170Z >       self.env.execute('key_by_test')
> 2021-01-07T00:00:50.7683433Z 
> 2021-01-07T00:00:50.7683692Z 
> pyflink/datastream/tests/test_data_stream.py:209: 
> 2021-01-07T00:00:50.7684056Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> 2021-01-07T00:00:50.7684475Z 
> pyflink/datastream/stream_execution_environment.py:622: in execute
> 2021-01-07T00:00:50.7684912Z     return 
> JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))
> 2021-01-07T00:00:50.7685548Z 
> .tox/py37-cython/lib/python3.7/site-packages/py4j/java_gateway.py:1286: in 
> __call__
> 2021-01-07T00:00:50.7685952Z     answer, self.gateway_client, self.target_id, 
> self.name)
> 2021-01-07T00:00:50.7686263Z pyflink/util/exceptions.py:147: in deco
> 2021-01-07T00:00:50.7686531Z     return f(*a, **kw)
> 2021-01-07T00:00:50.7686854Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> 2021-01-07T00:00:50.7687094Z 
> 2021-01-07T00:00:50.7687414Z answer = 'xro3160'
> 2021-01-07T00:00:50.7687713Z gateway_client = 
> <py4j.java_gateway.GatewayClient object at 0x7fbabfe34860>
> 2021-01-07T00:00:50.7688355Z target_id = 'o2910', name = 'execute'
> 2021-01-07T00:00:50.7688618Z 
> 2021-01-07T00:00:50.7688940Z     def get_return_value(answer, gateway_client, 
> target_id=None, name=None):
> 2021-01-07T00:00:50.7689397Z         """Converts an answer received from the 
> Java gateway into a Python object.
> 2021-01-07T00:00:50.7689685Z     
> 2021-01-07T00:00:50.7689979Z         For example, string representation of 
> integers are converted to Python
> 2021-01-07T00:00:50.7690389Z         integer, string representation of 
> objects are converted to JavaObject
> 2021-01-07T00:00:50.7690695Z         instances, etc.
> 2021-01-07T00:00:50.7690903Z     
> 2021-01-07T00:00:50.7691345Z         :param answer: the string returned by 
> the Java gateway
> 2021-01-07T00:00:50.7691840Z         :param gateway_client: the gateway 
> client used to communicate with the Java
> 2021-01-07T00:00:50.7692279Z             Gateway. Only necessary if the 
> answer is a reference (e.g., object,
> 2021-01-07T00:00:50.7692592Z             list, map)
> 2021-01-07T00:00:50.7692940Z         :param target_id: the name of the object 
> from which the answer comes from
> 2021-01-07T00:00:50.7693343Z             (e.g., *object1* in 
> `object1.hello()`). Optional.
> 2021-01-07T00:00:50.7693721Z         :param name: the name of the member from 
> which the answer comes from
> 2021-01-07T00:00:50.7694110Z             (e.g., *hello* in 
> `object1.hello()`). Optional.
> 2021-01-07T00:00:50.7694384Z         """
> 2021-01-07T00:00:50.7694612Z         if is_error(answer)[0]:
> 2021-01-07T00:00:50.7694901Z             if len(answer) > 1:
> 2021-01-07T00:00:50.7695181Z                 type = answer[1]
> 2021-01-07T00:00:50.7695524Z                 value = 
> OUTPUT_CONVERTER[type](answer[2:], gateway_client)
> 2021-01-07T00:00:50.7695928Z                 if answer[1] == REFERENCE_TYPE:
> 2021-01-07T00:00:50.7696229Z                     raise Py4JJavaError(
> 2021-01-07T00:00:50.7696573Z                         "An error occurred while 
> calling {0}{1}{2}.\n".
> 2021-01-07T00:00:50.7696950Z >                       format(target_id, ".", 
> name), value)
> 2021-01-07T00:00:50.7697368Z E                   py4j.protocol.Py4JJavaError: 
> An error occurred while calling o2910.execute.
> 2021-01-07T00:00:50.7698009Z E                   : 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 2021-01-07T00:00:50.7698651Z E                        at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> 2021-01-07T00:00:50.7699296Z E                        at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
> 2021-01-07T00:00:50.7699991Z E                        at 
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
> 2021-01-07T00:00:50.7700590Z E                        at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> 2021-01-07T00:00:50.7701243Z E                        at 
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
> 2021-01-07T00:00:50.7701911Z E                        at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:238)
> 2021-01-07T00:00:50.7702556Z E                        at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> 2021-01-07T00:00:50.7703161Z E                        at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> 2021-01-07T00:00:50.7703746Z E                        at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> 2021-01-07T00:00:50.7704322Z E                        at 
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
> 2021-01-07T00:00:50.7704898Z E                        at 
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
> 2021-01-07T00:00:50.7705402Z E                        at 
> akka.dispatch.OnComplete.internal(Future.scala:264)
> 2021-01-07T00:00:50.7705956Z E                        at 
> akka.dispatch.OnComplete.internal(Future.scala:261)
> 2021-01-07T00:00:50.7706430Z E                        at 
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
> 2021-01-07T00:00:50.7706900Z E                        at 
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
> 2021-01-07T00:00:50.7707393Z E                        at 
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> 2021-01-07T00:00:50.7707964Z E                        at 
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
> 2021-01-07T00:00:50.7708612Z E                        at 
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
> 2021-01-07T00:00:50.7709172Z E                        at 
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
> 2021-01-07T00:00:50.7709693Z E                        at 
> akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
> 2021-01-07T00:00:50.7710230Z E                        at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
> 2021-01-07T00:00:50.7710871Z E                        at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
> 2021-01-07T00:00:50.7711587Z E                        at 
> scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
> 2021-01-07T00:00:50.7712109Z E                        at 
> scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
> 2021-01-07T00:00:50.7712601Z E                        at 
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> 2021-01-07T00:00:50.7713149Z E                        at 
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> 2021-01-07T00:00:50.7713780Z E                        at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
> 2021-01-07T00:00:50.7714402Z E                        at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> 2021-01-07T00:00:50.7715031Z E                        at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> 2021-01-07T00:00:50.7715606Z E                        at 
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> 2021-01-07T00:00:50.7716213Z E                        at 
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
> 2021-01-07T00:00:50.7716752Z E                        at 
> akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> 2021-01-07T00:00:50.7717335Z E                        at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> 2021-01-07T00:00:50.7717905Z E                        at 
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 2021-01-07T00:00:50.7718442Z E                        at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 2021-01-07T00:00:50.7719053Z E                        at 
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 2021-01-07T00:00:50.7719588Z E                        at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2021-01-07T00:00:50.7720178Z E                   Caused by: 
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
> 2021-01-07T00:00:50.7720870Z E                        at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
> 2021-01-07T00:00:50.7721757Z E                        at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
> 2021-01-07T00:00:50.7722601Z E                        at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:230)
> 2021-01-07T00:00:50.7723235Z E                        at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:221)
> 2021-01-07T00:00:50.7723990Z E                        at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:212)
> 2021-01-07T00:00:50.7724669Z E                        at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:691)
> 2021-01-07T00:00:50.7725311Z E                        at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
> 2021-01-07T00:00:50.7725909Z E                        at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:446)
> 2021-01-07T00:00:50.7726438Z E                        at 
> jdk.internal.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
> 2021-01-07T00:00:50.7726995Z E                        at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2021-01-07T00:00:50.7727531Z E                        at 
> java.base/java.lang.reflect.Method.invoke(Method.java:566)
> 2021-01-07T00:00:50.7728073Z E                        at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306)
> 2021-01-07T00:00:50.7728756Z E                        at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
> 2021-01-07T00:00:50.7729369Z E                        at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> 2021-01-07T00:00:50.7729991Z E                        at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
> 2021-01-07T00:00:50.7730535Z E                        at 
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> 2021-01-07T00:00:50.7731089Z E                        at 
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> 2021-01-07T00:00:50.7731663Z E                        at 
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> 2021-01-07T00:00:50.7732183Z E                        at 
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> 2021-01-07T00:00:50.7732693Z E                        at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> 2021-01-07T00:00:50.7733212Z E                        at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> 2021-01-07T00:00:50.7733805Z E                        at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> 2021-01-07T00:00:50.7734282Z E                        at 
> akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> 2021-01-07T00:00:50.7734780Z E                        at 
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> 2021-01-07T00:00:50.7735281Z E                        at 
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> 2021-01-07T00:00:50.7735733Z E                        at 
> akka.actor.ActorCell.invoke(ActorCell.scala:561)
> 2021-01-07T00:00:50.7736197Z E                        at 
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> 2021-01-07T00:00:50.7736657Z E                        at 
> akka.dispatch.Mailbox.run(Mailbox.scala:225)
> 2021-01-07T00:00:50.7737079Z E                        at 
> akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> 2021-01-07T00:00:50.7737434Z E                        ... 4 more
> 2021-01-07T00:00:50.7737848Z E                   Caused by: 
> java.lang.ArrayIndexOutOfBoundsException: Index 1 out of bounds for length 0
> 2021-01-07T00:00:50.7738339Z E                        at 
> java.base/java.util.ArrayList.add(ArrayList.java:487)
> 2021-01-07T00:00:50.7738855Z E                        at 
> java.base/java.util.ArrayList.add(ArrayList.java:499)
> 2021-01-07T00:00:50.7739464Z E                        at 
> org.apache.flink.streaming.api.runners.python.beam.PythonSharedResources.addPythonEnvironmentManager(PythonSharedResources.java:66)
> 2021-01-07T00:00:50.7740189Z E                        at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:253)
> 2021-01-07T00:00:50.7740921Z E                        at 
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:113)
> 2021-01-07T00:00:50.7741851Z E                        at 
> org.apache.flink.streaming.api.operators.python.OneInputPythonFunctionOperator.open(OneInputPythonFunctionOperator.java:107)
> 2021-01-07T00:00:50.7742552Z E                        at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:426)
> 2021-01-07T00:00:50.7743218Z E                        at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:533)
> 2021-01-07T00:00:50.7743955Z E                        at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> 2021-01-07T00:00:50.7744679Z E                        at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:523)
> 2021-01-07T00:00:50.7745244Z E                        at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:563)
> 2021-01-07T00:00:50.7745784Z E                        at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> 2021-01-07T00:00:50.7746285Z E                        at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> 2021-01-07T00:00:50.7746729Z E                        at 
> java.base/java.lang.Thread.run(Thread.java:834)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to