[ 
https://issues.apache.org/jira/browse/FLINK-20066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-20066:
----------------------------
    Priority: Critical  (was: Major)

> BatchPandasUDAFITTests.test_group_aggregate_with_aux_group unstable
> -------------------------------------------------------------------
>
>                 Key: FLINK-20066
>                 URL: https://issues.apache.org/jira/browse/FLINK-20066
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 1.12.0
>            Reporter: Dian Fu
>            Priority: Critical
>              Labels: test-stability
>             Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9361&view=logs&j=bdd9ea51-4de2-506a-d4d9-f3930e4d2355&t=98717c4f-b888-5636-bb1e-db7aca25755e
> {code}
> 2020-11-09T23:41:41.8853547Z =================================== FAILURES 
> ===================================
> 2020-11-09T23:41:41.8854000Z __________ 
> BatchPandasUDAFITTests.test_group_aggregate_with_aux_group __________
> 2020-11-09T23:41:41.8854324Z 
> 2020-11-09T23:41:41.8854647Z self = 
> <pyflink.table.tests.test_pandas_udaf.BatchPandasUDAFITTests 
> testMethod=test_group_aggregate_with_aux_group>
> 2020-11-09T23:41:41.8854956Z 
> 2020-11-09T23:41:41.8855205Z     def 
> test_group_aggregate_with_aux_group(self):
> 2020-11-09T23:41:41.8855521Z         t = self.t_env.from_elements(
> 2020-11-09T23:41:41.8858372Z             [(1, 2, 3), (3, 2, 3), (2, 1, 3), 
> (1, 5, 4), (1, 8, 6), (2, 3, 4)],
> 2020-11-09T23:41:41.8858807Z             DataTypes.ROW(
> 2020-11-09T23:41:41.8859091Z                 [DataTypes.FIELD("a", 
> DataTypes.TINYINT()),
> 2020-11-09T23:41:41.8859453Z                  DataTypes.FIELD("b", 
> DataTypes.SMALLINT()),
> 2020-11-09T23:41:41.8859783Z                  DataTypes.FIELD("c", 
> DataTypes.INT())]))
> 2020-11-09T23:41:41.8860012Z     
> 2020-11-09T23:41:41.8860255Z         table_sink = 
> source_sink_utils.TestAppendSink(
> 2020-11-09T23:41:41.8863051Z             ['a', 'b', 'c', 'd'],
> 2020-11-09T23:41:41.8863523Z             [DataTypes.TINYINT(), 
> DataTypes.INT(), DataTypes.FLOAT(), DataTypes.INT()])
> 2020-11-09T23:41:41.8864048Z         
> self.t_env.register_table_sink("Results", table_sink)
> 2020-11-09T23:41:41.8864715Z         
> self.t_env.get_config().get_configuration().set_string('python.metric.enabled',
>  'true')
> 2020-11-09T23:41:41.8865161Z         self.t_env.register_function("max_add", 
> udaf(MaxAdd(),
> 2020-11-09T23:41:41.8865573Z                                                  
>     result_type=DataTypes.INT(),
> 2020-11-09T23:41:41.8865999Z                                                  
>     func_type="pandas"))
> 2020-11-09T23:41:41.8866426Z         
> self.t_env.create_temporary_system_function("mean_udaf", mean_udaf)
> 2020-11-09T23:41:41.8866759Z         t.group_by("a") \
> 2020-11-09T23:41:41.8867052Z             .select("a, a + 1 as b, a + 2 as c") 
> \
> 2020-11-09T23:41:41.8867352Z             .group_by("a, b") \
> 2020-11-09T23:41:41.8867660Z             .select("a, b, mean_udaf(b), 
> max_add(b, c, 1)") \
> 2020-11-09T23:41:41.8868026Z >           .execute_insert("Results") \
> 2020-11-09T23:41:41.8868293Z             .wait()
> 2020-11-09T23:41:41.8868446Z 
> 2020-11-09T23:41:41.8868704Z pyflink/table/tests/test_pandas_udaf.py:95: 
> 2020-11-09T23:41:41.8869077Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> 2020-11-09T23:41:41.8869464Z pyflink/table/table_result.py:76: in wait
> 2020-11-09T23:41:41.8870150Z     get_method(self._j_table_result, "await")()
> 2020-11-09T23:41:41.8870850Z 
> .tox/py37-cython/lib/python3.7/site-packages/py4j/java_gateway.py:1286: in 
> __call__
> 2020-11-09T23:41:41.8871415Z     answer, self.gateway_client, self.target_id, 
> self.name)
> 2020-11-09T23:41:41.8871768Z pyflink/util/exceptions.py:147: in deco
> 2020-11-09T23:41:41.8872032Z     return f(*a, **kw)
> 2020-11-09T23:41:41.8872378Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> 2020-11-09T23:41:41.8872629Z 
> 2020-11-09T23:41:41.8872978Z answer = 'xro24238'
> 2020-11-09T23:41:41.8873296Z gateway_client = 
> <py4j.java_gateway.GatewayClient object at 0x7f0be26f2358>
> 2020-11-09T23:41:41.8873792Z target_id = 'o24237', name = 'await'
> 2020-11-09T23:41:41.8874097Z 
> 2020-11-09T23:41:41.8874433Z     def get_return_value(answer, gateway_client, 
> target_id=None, name=None):
> 2020-11-09T23:41:41.8874893Z         """Converts an answer received from the 
> Java gateway into a Python object.
> 2020-11-09T23:41:41.8875212Z     
> 2020-11-09T23:41:41.8875515Z         For example, string representation of 
> integers are converted to Python
> 2020-11-09T23:41:41.8875922Z         integer, string representation of 
> objects are converted to JavaObject
> 2020-11-09T23:41:41.8876255Z         instances, etc.
> 2020-11-09T23:41:41.8876584Z     
> 2020-11-09T23:41:41.8876873Z         :param answer: the string returned by 
> the Java gateway
> 2020-11-09T23:41:41.8877307Z         :param gateway_client: the gateway 
> client used to communicate with the Java
> 2020-11-09T23:41:41.8877749Z             Gateway. Only necessary if the 
> answer is a reference (e.g., object,
> 2020-11-09T23:41:41.8878126Z             list, map)
> 2020-11-09T23:41:41.8878646Z         :param target_id: the name of the object 
> from which the answer comes from
> 2020-11-09T23:41:41.8879154Z             (e.g., *object1* in 
> `object1.hello()`). Optional.
> 2020-11-09T23:41:41.8879679Z         :param name: the name of the member from 
> which the answer comes from
> 2020-11-09T23:41:41.8880097Z             (e.g., *hello* in 
> `object1.hello()`). Optional.
> 2020-11-09T23:41:41.8880371Z         """
> 2020-11-09T23:41:41.8880621Z         if is_error(answer)[0]:
> 2020-11-09T23:41:41.8881088Z             if len(answer) > 1:
> 2020-11-09T23:41:41.8881457Z                 type = answer[1]
> 2020-11-09T23:41:41.8881967Z                 value = 
> OUTPUT_CONVERTER[type](answer[2:], gateway_client)
> 2020-11-09T23:41:41.8882417Z                 if answer[1] == REFERENCE_TYPE:
> 2020-11-09T23:41:41.8882754Z                     raise Py4JJavaError(
> 2020-11-09T23:41:41.8883113Z                         "An error occurred while 
> calling {0}{1}{2}.\n".
> 2020-11-09T23:41:41.8883490Z >                       format(target_id, ".", 
> name), value)
> 2020-11-09T23:41:41.8883935Z E                   py4j.protocol.Py4JJavaError: 
> An error occurred while calling o24237.await.
> 2020-11-09T23:41:41.8884552Z E                   : 
> java.util.concurrent.ExecutionException: 
> org.apache.flink.table.api.TableException: Failed to wait job finish
> 2020-11-09T23:41:41.8885193Z E                        at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> 2020-11-09T23:41:41.8885796Z E                        at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> 2020-11-09T23:41:41.8886595Z E                        at 
> org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:119)
> 2020-11-09T23:41:41.8887337Z E                        at 
> org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:86)
> 2020-11-09T23:41:41.8887976Z E                        at 
> sun.reflect.GeneratedMethodAccessor225.invoke(Unknown Source)
> 2020-11-09T23:41:41.8888520Z E                        at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2020-11-09T23:41:41.8889053Z E                        at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2020-11-09T23:41:41.8889688Z E                        at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> 2020-11-09T23:41:41.8890493Z E                        at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> 2020-11-09T23:41:41.8891500Z E                        at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> 2020-11-09T23:41:41.8892134Z E                        at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> 2020-11-09T23:41:41.8892797Z E                        at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> 2020-11-09T23:41:41.8893443Z E                        at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> 2020-11-09T23:41:41.8893959Z E                        at 
> java.lang.Thread.run(Thread.java:748)
> 2020-11-09T23:41:41.8894452Z E                   Caused by: 
> org.apache.flink.table.api.TableException: Failed to wait job finish
> 2020-11-09T23:41:41.8895065Z E                        at 
> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:59)
> 2020-11-09T23:41:41.8895748Z E                        at 
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:355)
> 2020-11-09T23:41:41.8896497Z E                        at 
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.isFirstRowReady(TableResultImpl.java:368)
> 2020-11-09T23:41:41.8897328Z E                        at 
> org.apache.flink.table.api.internal.TableResultImpl.lambda$awaitInternal$1(TableResultImpl.java:107)
> 2020-11-09T23:41:41.8897954Z E                        at 
> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
> 2020-11-09T23:41:41.8898554Z E                        at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 2020-11-09T23:41:41.8899153Z E                        at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 2020-11-09T23:41:41.8899678Z E                        ... 1 more
> 2020-11-09T23:41:41.8900197Z E                   Caused by: 
> java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 2020-11-09T23:41:41.8900946Z E                        at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> 2020-11-09T23:41:41.8901597Z E                        at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> 2020-11-09T23:41:41.8902192Z E                        at 
> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:57)
> 2020-11-09T23:41:41.8902668Z E                        ... 7 more
> 2020-11-09T23:41:41.8903107Z E                   Caused by: 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 2020-11-09T23:41:41.8903704Z E                        at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
> 2020-11-09T23:41:41.8904393Z E                        at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119)
> 2020-11-09T23:41:41.8905066Z E                        at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> 2020-11-09T23:41:41.8905647Z E                        at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
> 2020-11-09T23:41:41.8906253Z E                        at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2020-11-09T23:41:41.8906842Z E                        at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> 2020-11-09T23:41:41.8907475Z E                        at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
> 2020-11-09T23:41:41.8908136Z E                        at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> 2020-11-09T23:41:41.8908757Z E                        at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> 2020-11-09T23:41:41.8909434Z E                        at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2020-11-09T23:41:41.8910095Z E                        at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> 2020-11-09T23:41:41.8910691Z E                        at 
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:996)
> 2020-11-09T23:41:41.8911343Z E                        at 
> akka.dispatch.OnComplete.internal(Future.scala:264)
> 2020-11-09T23:41:41.8911826Z E                        at 
> akka.dispatch.OnComplete.internal(Future.scala:261)
> 2020-11-09T23:41:41.8912326Z E                        at 
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
> 2020-11-09T23:41:41.8912822Z E                        at 
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
> 2020-11-09T23:41:41.8913336Z E                        at 
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> 2020-11-09T23:41:41.8914205Z E                        at 
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
> 2020-11-09T23:41:41.8914918Z E                        at 
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
> 2020-11-09T23:41:41.8915503Z E                        at 
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
> 2020-11-09T23:41:41.8916158Z E                        at 
> akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
> 2020-11-09T23:41:41.8916723Z E                        at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
> 2020-11-09T23:41:41.8917388Z E                        at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
> 2020-11-09T23:41:41.8917992Z E                        at 
> scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
> 2020-11-09T23:41:41.8918539Z E                        at 
> scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
> 2020-11-09T23:41:41.8919041Z E                        at 
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> 2020-11-09T23:41:41.8919696Z E                        at 
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> 2020-11-09T23:41:41.8920328Z E                        at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
> 2020-11-09T23:41:41.8921129Z E                        at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> 2020-11-09T23:41:41.8921790Z E                        at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> 2020-11-09T23:41:41.8922396Z E                        at 
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> 2020-11-09T23:41:41.8922951Z E                        at 
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
> 2020-11-09T23:41:41.8923502Z E                        at 
> akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> 2020-11-09T23:41:41.8924115Z E                        at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> 2020-11-09T23:41:41.8924715Z E                        at 
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 2020-11-09T23:41:41.8925275Z E                        at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 2020-11-09T23:41:41.8925845Z E                        at 
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 2020-11-09T23:41:41.8926402Z E                        at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2020-11-09T23:41:41.8927017Z E                   Caused by: 
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
> 2020-11-09T23:41:41.8927737Z E                        at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
> 2020-11-09T23:41:41.8928609Z E                        at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
> 2020-11-09T23:41:41.8929366Z E                        at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
> 2020-11-09T23:41:41.8930124Z E                        at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
> 2020-11-09T23:41:41.8930893Z E                        at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
> 2020-11-09T23:41:41.8931631Z E                        at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:534)
> 2020-11-09T23:41:41.8932264Z E                        at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
> 2020-11-09T23:41:41.8932883Z E                        at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
> 2020-11-09T23:41:41.8933389Z E                        at 
> sun.reflect.GeneratedMethodAccessor40.invoke(Unknown Source)
> 2020-11-09T23:41:41.8933910Z E                        at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2020-11-09T23:41:41.8934575Z E                        at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2020-11-09T23:41:41.8935103Z E                        at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
> 2020-11-09T23:41:41.8935719Z E                        at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
> 2020-11-09T23:41:41.8936349Z E                        at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> 2020-11-09T23:41:41.8936952Z E                        at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
> 2020-11-09T23:41:41.8937525Z E                        at 
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> 2020-11-09T23:41:41.8938053Z E                        at 
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> 2020-11-09T23:41:41.8938571Z E                        at 
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> 2020-11-09T23:41:41.8939116Z E                        at 
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> 2020-11-09T23:41:41.8939747Z E                        at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> 2020-11-09T23:41:41.8940259Z E                        at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> 2020-11-09T23:41:41.8940844Z E                        at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> 2020-11-09T23:41:41.8941682Z E                        at 
> akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> 2020-11-09T23:41:41.8942198Z E                        at 
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> 2020-11-09T23:41:41.8942739Z E                        at 
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> 2020-11-09T23:41:41.8943237Z E                        at 
> akka.actor.ActorCell.invoke(ActorCell.scala:561)
> 2020-11-09T23:41:41.8943713Z E                        at 
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> 2020-11-09T23:41:41.8944207Z E                        at 
> akka.dispatch.Mailbox.run(Mailbox.scala:225)
> 2020-11-09T23:41:41.8944660Z E                        at 
> akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> 2020-11-09T23:41:41.8945007Z E                        ... 4 more
> 2020-11-09T23:41:41.8945402Z E                   Caused by: 
> java.lang.RuntimeException: Failed to close remote bundle
> 2020-11-09T23:41:41.8946046Z E                        at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:368)
> 2020-11-09T23:41:41.8946794Z E                        at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:322)
> 2020-11-09T23:41:41.8947691Z E                        at 
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:283)
> 2020-11-09T23:41:41.8948546Z E                        at 
> org.apache.flink.streaming.api.operators.python.AbstractOneInputPythonFunctionOperator.endInput(AbstractOneInputPythonFunctionOperator.java:42)
> 2020-11-09T23:41:41.8949613Z E                        at 
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.AbstractBatchArrowPythonAggregateFunctionOperator.endInput(AbstractBatchArrowPythonAggregateFunctionOperator.java:95)
> 2020-11-09T23:41:41.8950643Z E                        at 
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupAggregateFunctionOperator.endInput(BatchArrowPythonGroupAggregateFunctionOperator.java:33)
> 2020-11-09T23:41:41.8951667Z E                        at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
> 2020-11-09T23:41:41.8952406Z E                        at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127)
> 2020-11-09T23:41:41.8953139Z E                        at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
> 2020-11-09T23:41:41.8954086Z E                        at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127)
> 2020-11-09T23:41:41.8954847Z E                        at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
> 2020-11-09T23:41:41.8955495Z E                        at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412)
> 2020-11-09T23:41:41.8956082Z E                        at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:587)
> 2020-11-09T23:41:41.8956660Z E                        at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:549)
> 2020-11-09T23:41:41.8957218Z E                        at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> 2020-11-09T23:41:41.8957747Z E                        at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> 2020-11-09T23:41:41.8958311Z E                        at 
> java.lang.Thread.run(Thread.java:748)
> 2020-11-09T23:41:41.8959085Z E                   Caused by: 
> java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error 
> received from SDK harness for instruction 2: Traceback (most recent call 
> last):
> 2020-11-09T23:41:41.8960839Z E                     File 
> "/__w/2/s/flink-python/.tox/py37-cython/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 253, in _execute
> 2020-11-09T23:41:41.8961406Z E                       response = task()
> 2020-11-09T23:41:41.8962160Z E                     File 
> "/__w/2/s/flink-python/.tox/py37-cython/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 310, in <lambda>
> 2020-11-09T23:41:41.8962733Z E                       lambda: 
> self.create_worker().do_instruction(request), request)
> 2020-11-09T23:41:41.8963624Z E                     File 
> "/__w/2/s/flink-python/.tox/py37-cython/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 480, in do_instruction
> 2020-11-09T23:41:41.8964156Z E                       getattr(request, 
> request_type), request.instruction_id)
> 2020-11-09T23:41:41.8964947Z E                     File 
> "/__w/2/s/flink-python/.tox/py37-cython/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 515, in process_bundle
> 2020-11-09T23:41:41.8965477Z E                       
> bundle_processor.process_bundle(instruction_id))
> 2020-11-09T23:41:41.8966276Z E                     File 
> "/__w/2/s/flink-python/.tox/py37-cython/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 978, in process_bundle
> 2020-11-09T23:41:41.8966767Z E                       element.data)
> 2020-11-09T23:41:41.8967636Z E                     File 
> "/__w/2/s/flink-python/.tox/py37-cython/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 218, in process_encoded
> 2020-11-09T23:41:41.8968489Z E                       
> self.output(decoded_value)
> 2020-11-09T23:41:41.8968975Z E                     File 
> "apache_beam/runners/worker/operations.py", line 330, in 
> apache_beam.runners.worker.operations.Operation.output
> 2020-11-09T23:41:41.8969676Z E                     File 
> "apache_beam/runners/worker/operations.py", line 332, in 
> apache_beam.runners.worker.operations.Operation.output
> 2020-11-09T23:41:41.8970276Z E                     File 
> "apache_beam/runners/worker/operations.py", line 195, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> 2020-11-09T23:41:41.8971101Z E                     File 
> "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 71, in 
> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
> 2020-11-09T23:41:41.8971649Z E                       with 
> self.scoped_process_state:
> 2020-11-09T23:41:41.8972182Z E                     File 
> "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 75, in 
> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
> 2020-11-09T23:41:41.8972711Z E                       self.func(value), 
> self.consumer.output_stream, True)
> 2020-11-09T23:41:41.8973182Z E                     File "<string>", line 1, 
> in <lambda>
> 2020-11-09T23:41:41.8973840Z E                     File 
> "/__w/2/s/flink-python/pyflink/table/udf.py", line 269, in eval
> 2020-11-09T23:41:41.8974254Z E                       return 
> self.func.get_value(accumulator)
> 2020-11-09T23:41:41.8974892Z E                     File 
> "/__w/2/s/flink-python/pyflink/table/tests/test_pandas_udaf.py", line 709, in 
> get_value
> 2020-11-09T23:41:41.8975341Z E                       
> self.assertEqual(self.counter_sum, self.counter.get_count())
> 2020-11-09T23:41:41.8976040Z E                     File 
> "/__w/2/s/flink-python/dev/.conda/lib/python3.7/unittest/case.py", line 839, 
> in assertEqual
> 2020-11-09T23:41:41.8976492Z E                       assertion_func(first, 
> second, msg=msg)
> 2020-11-09T23:41:41.8977147Z E                     File 
> "/__w/2/s/flink-python/dev/.conda/lib/python3.7/unittest/case.py", line 832, 
> in _baseAssertEqual
> 2020-11-09T23:41:41.8977616Z E                       raise 
> self.failureException(msg)
> 2020-11-09T23:41:41.8977964Z E                   AssertionError: 30 != 10
> 2020-11-09T23:41:41.8978238Z E                   
> 2020-11-09T23:41:41.8978633Z E                        at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> 2020-11-09T23:41:41.8979293Z E                        at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> 2020-11-09T23:41:41.8979986Z E                        at 
> org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
> 2020-11-09T23:41:41.8980619Z E                        at 
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:458)
> 2020-11-09T23:41:41.8981558Z E                        at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:547)
> 2020-11-09T23:41:41.8982319Z E                        at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:366)
> 2020-11-09T23:41:41.8982840Z E                        ... 16 more
> 2020-11-09T23:41:41.8983356Z E                   Caused by: 
> java.lang.RuntimeException: Error received from SDK harness for instruction 
> 2: Traceback (most recent call last):
> 2020-11-09T23:41:41.8984329Z E                     File 
> "/__w/2/s/flink-python/.tox/py37-cython/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 253, in _execute
> 2020-11-09T23:41:41.8984816Z E                       response = task()
> 2020-11-09T23:41:41.8985619Z E                     File 
> "/__w/2/s/flink-python/.tox/py37-cython/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 310, in <lambda>
> 2020-11-09T23:41:41.8986330Z E                       lambda: 
> self.create_worker().do_instruction(request), request)
> 2020-11-09T23:41:41.8987155Z E                     File 
> "/__w/2/s/flink-python/.tox/py37-cython/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 480, in do_instruction
> 2020-11-09T23:41:41.8987713Z E                       getattr(request, 
> request_type), request.instruction_id)
> 2020-11-09T23:41:41.8988500Z E                     File 
> "/__w/2/s/flink-python/.tox/py37-cython/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 515, in process_bundle
> 2020-11-09T23:41:41.8989029Z E                       
> bundle_processor.process_bundle(instruction_id))
> 2020-11-09T23:41:41.8989883Z E                     File 
> "/__w/2/s/flink-python/.tox/py37-cython/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 978, in process_bundle
> 2020-11-09T23:41:41.8990359Z E                       element.data)
> 2020-11-09T23:41:41.8991201Z E                     File 
> "/__w/2/s/flink-python/.tox/py37-cython/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 218, in process_encoded
> 2020-11-09T23:41:41.8991707Z E                       
> self.output(decoded_value)
> 2020-11-09T23:41:41.8992425Z E                     File 
> "apache_beam/runners/worker/operations.py", line 330, in 
> apache_beam.runners.worker.operations.Operation.output
> 2020-11-09T23:41:41.8993023Z E                     File 
> "apache_beam/runners/worker/operations.py", line 332, in 
> apache_beam.runners.worker.operations.Operation.output
> 2020-11-09T23:41:41.8993656Z E                     File 
> "apache_beam/runners/worker/operations.py", line 195, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> 2020-11-09T23:41:41.8994447Z E                     File 
> "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 71, in 
> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
> 2020-11-09T23:41:41.8995170Z E                       with 
> self.scoped_process_state:
> 2020-11-09T23:41:41.8995838Z E                     File 
> "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 75, in 
> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
> 2020-11-09T23:41:41.8996449Z E                       self.func(value), 
> self.consumer.output_stream, True)
> 2020-11-09T23:41:41.8996844Z E                     File "<string>", line 1, 
> in <lambda>
> 2020-11-09T23:41:41.8997535Z E                     File 
> "/__w/2/s/flink-python/pyflink/table/udf.py", line 269, in eval
> 2020-11-09T23:41:41.8997963Z E                       return 
> self.func.get_value(accumulator)
> 2020-11-09T23:41:41.8998633Z E                     File 
> "/__w/2/s/flink-python/pyflink/table/tests/test_pandas_udaf.py", line 709, in 
> get_value
> 2020-11-09T23:41:41.8999200Z E                       
> self.assertEqual(self.counter_sum, self.counter.get_count())
> 2020-11-09T23:41:41.9000317Z E                     File 
> "/__w/2/s/flink-python/dev/.conda/lib/python3.7/unittest/case.py", line 839, 
> in assertEqual
> 2020-11-09T23:41:41.9001165Z E                       assertion_func(first, 
> second, msg=msg)
> 2020-11-09T23:41:41.9001865Z E                     File 
> "/__w/2/s/flink-python/dev/.conda/lib/python3.7/unittest/case.py", line 832, 
> in _baseAssertEqual
> 2020-11-09T23:41:41.9002396Z E                       raise 
> self.failureException(msg)
> 2020-11-09T23:41:41.9002865Z E                   AssertionError: 30 != 10
> 2020-11-09T23:41:41.9003240Z E                   
> 2020-11-09T23:41:41.9003822Z E                        at 
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177)
> 2020-11-09T23:41:41.9004735Z E                        at 
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
> 2020-11-09T23:41:41.9005621Z E                        at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
> 2020-11-09T23:41:41.9006612Z E                        at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
> 2020-11-09T23:41:41.9007519Z E                        at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
> 2020-11-09T23:41:41.9008389Z E                        at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
> 2020-11-09T23:41:41.9009224Z E                        at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
> 2020-11-09T23:41:41.9010178Z E                        at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
> 2020-11-09T23:41:41.9011130Z E                        at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> 2020-11-09T23:41:41.9012068Z E                        at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
> 2020-11-09T23:41:41.9012702Z E                        at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 2020-11-09T23:41:41.9013380Z E                        at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 2020-11-09T23:41:41.9013805Z E                        ... 1 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to